BakoAI / analysis /team_analysis_old.py
Okidi Norbert
Deployment fix: clean backend only
c6abe34
"""
Team Analysis Pipeline - Wraps template components for team video analysis.
This module uses the existing template components (PlayerTracker, BallTracker, etc.)
to analyze multi-player basketball footage for team-level insights.
"""
import os
import sys
import shutil
from typing import Dict, Any, List, Optional
# Add parent directory for template imports
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
def clear_video_stubs(stub_root: str) -> bool:
"""
Clear all stub files for a video to prepare for fresh analysis.
Args:
stub_root: Root directory containing stubs for this video
Returns:
True if successful, False otherwise
"""
try:
if os.path.exists(stub_root):
shutil.rmtree(stub_root)
os.makedirs(stub_root, exist_ok=True)
return True
except Exception as e:
print(f"⚠️ Warning: Could not clear video stubs at {stub_root}: {e}")
return False
return True
async def run_team_analysis(video_path: str, options: Optional[Dict[str, Any]] = None, video_id: Optional[str] = None) -> Dict[str, Any]:
"""
Run team analysis pipeline on a video.
"""
# Import template components (keep aligned with back-end/main.py)
from utils import read_video, save_video
from trackers import PlayerTracker, BallTracker
from team_assigner import TeamAssigner
from court_keypoint_detector import CourtKeypointDetector
from ball_aquisition.ball_aquisition_detector import BallAquisitionDetector
from pass_and_interception_detector import PassAndInterceptionDetector
from tactical_view_converter import TacticalViewConverter
from speed_and_distance_calculator import SpeedAndDistanceCalculator
from shot_detector import ShotDetector
from drawers import (
PlayerTracksDrawer,
BallTracksDrawer,
FrameNumberDrawer,
TeamBallControlDrawer,
PassInterceptionDrawer,
TacticalViewDrawer,
SpeedAndDistanceDrawer,
ShotDrawer,
)
from configs import (
STUBS_DEFAULT_PATH,
PLAYER_DETECTOR_PATH,
BALL_DETECTOR_PATH,
TEAM_MODEL_PATH,
COURT_KEYPOINT_DETECTOR_PATH,
)
from app.services.supabase_client import get_supabase_service
supabase = get_supabase_service()
async def update_progress(step: str, percent: int):
if video_id and supabase:
try:
await supabase.update("videos", video_id, {
"current_step": step,
"progress_percent": percent
})
except Exception as e:
print(f"Error updating progress: {e}")
await update_progress("Reading video", 5)
video_frames = read_video(video_path)
total_frames = len(video_frames)
if total_frames == 0:
return {"error": "Could not read video frames", "total_frames": 0}
# Options (align with CLI args in back-end/main.py)
options = options or {}
our_team_jersey = str(options.get("our_team_jersey") or "white jersey")
opponent_jersey = str(options.get("opponent_jersey") or "dark blue jersey")
try:
our_team_id = int(options.get("our_team_id") or 1)
except Exception:
our_team_id = 1
our_team_id = 1 if our_team_id not in (1, 2) else our_team_id
max_players = 10
try:
max_players = int(options.get("max_players_on_court") or max_players)
except Exception:
pass
max_players = max(1, min(20, max_players))
# Stub management options
clear_stubs_after = bool(options.get("clear_stubs_after", True))
read_from_stub = bool(options.get("read_from_stub", False))
# Use a per-video stub folder to avoid cross-video contamination.
stub_root = os.path.join(STUBS_DEFAULT_PATH, "api", str(video_id or "no-id"))
os.makedirs(stub_root, exist_ok=True)
# Initialize trackers/detectors
await update_progress("Initializing tracking models", 10)
player_tracker = PlayerTracker(PLAYER_DETECTOR_PATH)
ball_tracker = BallTracker(BALL_DETECTOR_PATH)
court_keypoint_detector = CourtKeypointDetector(COURT_KEYPOINT_DETECTOR_PATH)
# Detection & tracking (this path already enforces a max player limit inside PlayerTracker)
await update_progress("Tracking players and referees", 20)
player_tracks = player_tracker.get_object_tracks(
video_frames,
read_from_stub=read_from_stub,
stub_path=os.path.join(stub_root, "player_track_stubs.pkl"),
)
# Optional: tighten per-frame player cap if caller requests a smaller number than 10
if max_players < 10:
capped_tracks: List[Dict[int, Dict[str, Any]]] = []
for frame_tracks in player_tracks:
players = [(tid, t) for tid, t in (frame_tracks or {}).items() if str(t.get("class", "")).lower() == "player"]
refs = [(tid, t) for tid, t in (frame_tracks or {}).items() if str(t.get("class", "")).lower() == "referee"]
players.sort(key=lambda x: float(x[1].get("confidence", 0.0)), reverse=True)
keep = {tid: t for tid, t in players[:max_players]}
for tid, t in refs:
keep[tid] = t
capped_tracks.append(keep)
player_tracks = capped_tracks
await update_progress("Detecting and tracking ball", 35)
ball_tracks = ball_tracker.get_object_tracks(
video_frames,
read_from_stub=read_from_stub,
stub_path=os.path.join(stub_root, "ball_track_stubs.pkl"),
)
# Ball cleaning (improves continuity & reduces false positives)
ball_tracks = ball_tracker.remove_wrong_detections(ball_tracks)
ball_tracks = ball_tracker.interpolate_ball_positions(ball_tracks)
# Court keypoints
await update_progress("Detecting court layout", 50)
court_keypoints = court_keypoint_detector.get_court_keypoints(
video_frames,
read_from_stub=read_from_stub,
stub_path=os.path.join(stub_root, "court_key_points_stub.pkl"),
)
# Team assignment (jersey descriptions are critical for team accounts)
await update_progress("Assigning players to teams", 65)
team_assigner = TeamAssigner(team_1_class_name=our_team_jersey, team_2_class_name=opponent_jersey)
player_assignment = team_assigner.get_player_teams_across_frames(
video_frames,
player_tracks,
read_from_stub=read_from_stub,
stub_path=os.path.join(stub_root, "player_assignment_stub.pkl"),
)
# Ball possession + events
await update_progress("Analyzing ball possession", 75)
ball_aquisition_detector = BallAquisitionDetector()
ball_possession = ball_aquisition_detector.detect_ball_possession(player_tracks, ball_tracks)
await update_progress("Detecting passes and interceptions", 82)
pass_detector = PassAndInterceptionDetector()
passes = pass_detector.detect_passes(ball_possession, player_assignment)
interceptions = pass_detector.detect_interceptions(ball_possession, player_assignment)
# Tactical view and speed calculations
tactical_converter = TacticalViewConverter(court_image_path="./images/basketball_court.png")
court_keypoints = tactical_converter.validate_keypoints(court_keypoints)
tactical_positions = tactical_converter.transform_players_to_tactical_view(court_keypoints, player_tracks)
# Transform ball to tactical view
ball_xy_frames = []
for f_tracks in ball_tracks:
f_ball = {}
for b_id, b_data in (f_tracks or {}).items():
bbox = (b_data or {}).get("bbox")
if not bbox:
continue
f_ball[b_id] = [(bbox[0] + bbox[2]) / 2, bbox[3]] # bottom-center
ball_xy_frames.append(f_ball)
tactical_ball_positions = tactical_converter.transform_points_to_tactical(court_keypoints, ball_xy_frames)
speed_calculator = SpeedAndDistanceCalculator(
tactical_converter.width,
tactical_converter.height,
tactical_converter.actual_width_in_meters,
tactical_converter.actual_height_in_meters
)
distances = speed_calculator.calculate_distance(tactical_positions)
speeds = speed_calculator.calculate_speed(distances)
# Calculate team possession percentages
team_1_possession = 0
team_2_possession = 0
for frame_idx, (possession, assignment) in enumerate(zip(ball_possession, player_assignment)):
if possession != -1 and possession in assignment:
team = assignment[possession]
if team == 1:
team_1_possession += 1
elif team == 2:
team_2_possession += 1
total_possession = team_1_possession + team_2_possession
team_1_pct = (team_1_possession / total_possession * 100) if total_possession > 0 else 50
team_2_pct = (team_2_possession / total_possession * 100) if total_possession > 0 else 50
# Count unique tracked entities (players + referees)
unique_players = set()
for frame_tracks in player_tracks:
unique_players.update((frame_tracks or {}).keys())
# Shot Detection and Analysis (also gives hoop locations for overlays)
await update_progress("Detecting hoop and shots", 88)
try:
shot_detector = ShotDetector(
hoop_detection_model_path=TEAM_MODEL_PATH,
min_shot_arc_height=50,
hoop_proximity_threshold=100,
trajectory_window=30,
success_time_window=15
)
hoop_detections = shot_detector.detect_hoop_locations(
video_frames,
read_from_stub=read_from_stub,
stub_path=os.path.join(stub_root, "hoop_detections_stub.pkl"),
)
# Get video FPS
fps = 30 # Default
try:
import cv2
cap = cv2.VideoCapture(video_path)
fps = cap.get(cv2.CAP_PROP_FPS) or 30
cap.release()
except:
pass
# Detect shots
shots = shot_detector.detect_shots(
ball_tracks,
hoop_detections,
player_tracks=player_tracks,
player_assignment=player_assignment,
ball_possession=ball_possession,
fps=fps,
court_keypoints=court_keypoints
)
# Calculate overall shot statistics
shot_stats = shot_detector.calculate_shot_statistics(shots)
# Break down shots by team
team_1_shots = []
team_2_shots = []
for shot in shots:
start_frame = int(shot['start_frame'])
if start_frame < len(ball_possession) and start_frame < len(player_assignment):
player_with_ball = ball_possession[start_frame]
if player_with_ball != -1 and player_with_ball in player_assignment[start_frame]:
team = player_assignment[start_frame][player_with_ball]
# Update the shot in the main list so it's attribution stays for events
shot['team_id'] = int(team)
shot['player_id'] = int(player_with_ball)
if team == 1:
team_1_shots.append(shot)
elif team == 2:
team_2_shots.append(shot)
team_1_shot_stats = shot_detector.calculate_shot_statistics(team_1_shots)
team_2_shot_stats = shot_detector.calculate_shot_statistics(team_2_shots)
except Exception as e:
print(f"Shot detection failed: {e}")
hoop_detections = [None for _ in range(total_frames)]
shot_stats = {
'total_attempts': 0,
'total_made': 0,
'total_missed': 0,
'overall_percentage': 0.0,
'by_type': {},
'shots': []
}
team_1_shot_stats = shot_stats.copy()
team_2_shot_stats = shot_stats.copy()
# Render annotated output video (clean, consistent overlays)
await update_progress("Rendering annotated video", 92)
try:
if our_team_id == 1:
our_color = [0, 120, 255]
opp_color = [0, 0, 200]
else:
our_color = [0, 0, 200]
opp_color = [0, 120, 255]
player_tracks_drawer = PlayerTracksDrawer(team_1_color=our_color, team_2_color=opp_color)
ball_tracks_drawer = BallTracksDrawer()
frame_number_drawer = FrameNumberDrawer()
team_ball_control_drawer = TeamBallControlDrawer()
pass_and_interceptions_drawer = PassInterceptionDrawer()
tactical_view_drawer = TacticalViewDrawer()
speed_and_distance_drawer = SpeedAndDistanceDrawer()
shot_drawer = ShotDrawer()
output_video_frames = player_tracks_drawer.draw(video_frames, player_tracks, player_assignment, ball_possession)
output_video_frames = ball_tracks_drawer.draw(output_video_frames, ball_tracks)
output_video_frames = frame_number_drawer.draw(output_video_frames)
output_video_frames = team_ball_control_drawer.draw(output_video_frames, player_assignment, ball_possession)
output_video_frames = pass_and_interceptions_drawer.draw(output_video_frames, passes, interceptions)
output_video_frames = shot_drawer.draw(output_video_frames, shots, hoop_detections=hoop_detections)
output_video_frames = speed_and_distance_drawer.draw(output_video_frames, player_tracks, distances, speeds)
output_video_frames = tactical_view_drawer.draw(
output_video_frames,
tactical_converter.court_image_path,
tactical_converter.width,
tactical_converter.height,
tactical_converter.key_points,
tactical_positions,
player_assignment,
ball_possession,
)
annotated_path = os.path.join("output_videos", "annotated", f"{video_id or 'latest'}.mp4")
save_video(output_video_frames, annotated_path)
except Exception as e:
print(f"Annotated video rendering failed: {e}")
# Build per-frame detections for overlays (optional, can be large)
detections_stride = 1
max_detections = 200_000
if options:
try:
detections_stride = int(options.get("detections_stride", detections_stride))
except Exception:
pass
try:
max_detections = int(options.get("max_detections", max_detections))
except Exception:
pass
detections_stride = max(1, min(30, detections_stride))
max_detections = max(1_000, max_detections)
detections: List[Dict[str, Any]] = []
for frame_idx in range(0, total_frames, detections_stride):
# Players
assignment = player_assignment[frame_idx] if frame_idx < len(player_assignment) else {}
possession_player = ball_possession[frame_idx] if frame_idx < len(ball_possession) else -1
current_player_tracks = player_tracks[frame_idx] if frame_idx < len(player_tracks) else []
for track_id, track in (current_player_tracks or {}).items():
bbox = track.get("bbox")
if not bbox:
continue
# Get tactical position for this player in this frame
current_tactical = tactical_positions[frame_idx] if frame_idx < len(tactical_positions) else {}
player_tactical = current_tactical.get(track_id)
detections.append({
"frame": int(frame_idx),
"object_type": str(track.get("class", "player")),
"track_id": int(track_id),
"bbox": [float(b) for b in bbox] if bbox else None,
"confidence": float(track.get("confidence", 1.0)),
"team_id": int(assignment.get(track_id)) if assignment.get(track_id) is not None else None,
"has_ball": bool(possession_player == track_id),
"tactical_x": float(player_tactical[0]) if player_tactical else None,
"tactical_y": float(player_tactical[1]) if player_tactical else None,
"keypoints": None,
})
if len(detections) >= max_detections:
break
if len(detections) >= max_detections:
break
# Ball
current_ball_tracks = ball_tracks[frame_idx] if frame_idx < len(ball_tracks) else []
ball = (current_ball_tracks or {}).get(1)
if ball and ball.get("bbox"):
# Get tactical ball position
current_ball_tactical = tactical_ball_positions[frame_idx] if frame_idx < len(tactical_ball_positions) else {}
ball_tactical = current_ball_tactical.get(1)
detections.append({
"frame": int(frame_idx),
"object_type": "ball",
"track_id": 1,
"bbox": [float(b) for b in ball.get("bbox")] if ball.get("bbox") else None,
"confidence": float(ball.get("confidence", 1.0)),
"team_id": None,
"has_ball": False,
"tactical_x": float(ball_tactical[0]) if ball_tactical else None,
"tactical_y": float(ball_tactical[1]) if ball_tactical else None,
"keypoints": None,
})
# Hoops
hoop = hoop_detections[frame_idx] if frame_idx < len(hoop_detections) else None
if hoop and hoop.get("bbox"):
detections.append({
"frame": int(frame_idx),
"object_type": "hoop",
"track_id": 0,
"bbox": [float(b) for b in hoop.get("bbox")] if hoop.get("bbox") else None,
"confidence": float(hoop.get("confidence", 1.0)),
"team_id": None,
"has_ball": False,
"keypoints": None,
})
# Build events list
events = []
for frame_num, team_id in enumerate(passes):
if team_id != -1:
events.append({
"event_type": "pass",
"frame": frame_num,
"timestamp_seconds": frame_num / fps,
"details": {"team": team_id}
})
for frame_num, team_id in enumerate(interceptions):
if team_id != -1:
events.append({
"event_type": "interception",
"frame": frame_num,
"timestamp_seconds": frame_num / fps,
"details": {"team": team_id}
})
# Add shots to events
for shot in shots:
events.append({
"event_type": "shot",
"frame": int(shot['start_frame']),
"timestamp_seconds": float(shot['start_frame'] / fps),
"details": {
"outcome": str(shot['outcome']),
"team": int(shot.get('team_id')) if shot.get('team_id') is not None else None,
"player": int(shot.get('player_id')) if shot.get('player_id') is not None else None,
"type": str(shot.get('shot_type', 'unknown'))
}
})
# Get video duration
fps = 30 # Default assumption
try:
import cv2
cap = cv2.VideoCapture(video_path)
fps = cap.get(cv2.CAP_PROP_FPS) or 30
cap.release()
except:
pass
duration_seconds = total_frames / fps
# ============================================
# ADVANCED ANALYTICS (OPT-IN)
# ============================================
advanced_analytics = None
if options and options.get("enable_advanced_analytics", False):
try:
print("Running advanced analytics...")
from analytics_engine import AnalyticsCoordinator
coordinator = AnalyticsCoordinator()
advanced_analytics = coordinator.process_all(
video_frames=video_frames,
player_tracks=player_tracks,
ball_tracks=ball_tracks,
tactical_positions=tactical_positions,
player_assignment=player_assignment,
ball_possession=ball_possession,
events=events,
shots=shots,
court_keypoints=court_keypoints,
speeds=speeds,
video_path=video_path,
fps=fps
)
print(f"Advanced analytics complete: {len(advanced_analytics.get('modules_executed', []))} modules succeeded")
except Exception as e:
print(f"Advanced analytics failed: {e}")
advanced_analytics = {"error": str(e), "status": "failed"}
# Build result dictionary
result = {
"total_frames": int(total_frames),
"duration_seconds": float(duration_seconds),
"players_detected": int(len(unique_players)),
"team_1_possession_percent": float(round(team_1_pct, 1)),
"team_2_possession_percent": float(round(team_2_pct, 1)),
"total_passes": int(len([p for p in passes if p != -1])),
"total_interceptions": int(len([i for i in interceptions if i != -1])),
# Shot statistics
"shot_attempts": int(shot_stats['total_attempts']),
"shots_made": int(shot_stats['total_made']),
"shots_missed": int(shot_stats['total_missed']),
"shooting_percentage": float(shot_stats['overall_percentage']),
"shot_breakdown_by_type": shot_stats['by_type'],
# Team 1 shooting
"team_1_shot_attempts": int(team_1_shot_stats['total_attempts']),
"team_1_shots_made": int(team_1_shot_stats['total_made']),
# Team 2 shooting
"team_2_shot_attempts": int(team_2_shot_stats['total_attempts']),
"team_2_shots_made": int(team_2_shot_stats['total_made']),
"events": events,
"detections": detections,
}
# Add advanced analytics if available
if advanced_analytics:
result["advanced_analytics"] = advanced_analytics
# Clear stubs if requested (ensures fresh analysis on next run)
if clear_stubs_after:
await update_progress("Cleaning up cached data", 98)
try:
clear_video_stubs(stub_root)
print(f"✅ Cleared stubs for video {video_id or 'unknown'}")
except Exception as e:
print(f"⚠️ Could not clear stubs: {e}")
return result