""" Test the timeout tracker at each 40→25 transition. This script: 1. Loads the cached play clock readings with identified transitions 2. For each 40→25 transition, reads the timeout indicators BEFORE and AFTER 3. Compares the change in timeouts to determine if this was a timeout event 4. Compares results against ground truth Ground truth timeouts (Tennessee video): - 4:25 (HOME) -> transition at ~4:26 - 1:07:30 (AWAY) -> transition at ~1:07:24 - 1:09:40 (AWAY) -> transition at ~1:09:38 - 1:14:07 (HOME) -> transition at ~1:14:05 - 1:16:06 (HOME) -> transition at ~1:16:03 - 1:44:54 (AWAY) -> transition at ~1:44:48 Usage: # Default Tennessee video python scripts/test_timeout_at_transitions.py # Specific video python scripts/test_timeout_at_transitions.py --video "full_videos/OSU vs Texas 01.10.25.mkv" """ import argparse import json import logging import sys from pathlib import Path from typing import Any, Dict, List, Optional, Tuple import cv2 import numpy as np # Add src to path sys.path.insert(0, str(Path(__file__).parent.parent / "src")) from detection.timeouts import DetectTimeouts logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s") logger = logging.getLogger(__name__) PROJECT_ROOT = Path(__file__).parent.parent OUTPUT_DIR = PROJECT_ROOT / "output" def get_video_basename(video_path: str) -> str: """Get a clean basename from video path for config naming.""" basename = Path(video_path).stem for char in [" ", ".", "-"]: basename = basename.replace(char, "_") while "__" in basename: basename = basename.replace("__", "_") return basename.strip("_") # Ground truth timeouts (timestamp in seconds, team) GROUND_TRUTH_TIMEOUTS = [ (4 * 60 + 25, "HOME"), # 4:25 (67 * 60 + 30, "AWAY"), # 1:07:30 (69 * 60 + 40, "AWAY"), # 1:09:40 (74 * 60 + 7, "HOME"), # 1:14:07 (76 * 60 + 6, "HOME"), # 1:16:06 (104 * 60 + 54, "AWAY"), # 1:44:54 ] def seconds_to_timestamp(seconds: float) -> str: """Convert seconds to timestamp string (H:MM:SS).""" hours = int(seconds // 3600) minutes = int((seconds % 3600) // 60) secs = int(seconds % 60) if hours > 0: return f"{hours}:{minutes:02d}:{secs:02d}" return f"{minutes}:{secs:02d}" def is_ground_truth_timeout(timestamp: float, tolerance: float = 10.0) -> Optional[str]: """ Check if a timestamp corresponds to a ground truth timeout. Args: timestamp: Timestamp to check tolerance: Seconds tolerance for matching Returns: Team name ("HOME" or "AWAY") if this is a ground truth timeout, None otherwise """ for gt_time, team in GROUND_TRUTH_TIMEOUTS: if abs(timestamp - gt_time) <= tolerance: return team return None def test_timeout_at_transitions(): """Test timeout detection at each 40→25 transition.""" # Load cached transitions cache_path = Path("output/cache/playclock_readings_full.json") if not cache_path.exists(): logger.error("Cache file not found: %s", cache_path) logger.error("Run cache_playclock_readings.py first") return with open(cache_path, "r", encoding="utf-8") as f: cache = json.load(f) transitions = cache["transitions_40_to_25"] logger.info("Loaded %d transitions from cache", len(transitions)) # Parse arguments parser = argparse.ArgumentParser(description="Test timeout tracking at transitions") parser.add_argument("--video", type=str, default="full_videos/OSU vs Tenn 12.21.24.mkv", help="Path to video file") args = parser.parse_args() video_path = args.video video_basename = get_video_basename(video_path) # Try video-specific timeout config first config_path = OUTPUT_DIR / f"{video_basename}_timeout_config.json" if not config_path.exists(): # Fall back to generic config config_path = Path("data/config/timeout_tracker_region.json") if not config_path.exists(): logger.error("Timeout config not found: %s", config_path) logger.error("Try running main.py first to generate timeout config for this video") return logger.info("Using timeout config: %s", config_path) # Initialize timeout tracker tracker = DetectTimeouts(config_path=str(config_path)) if not tracker.is_configured(): logger.error("Timeout tracker not configured") return # Open video cap = cv2.VideoCapture(video_path) if not cap.isOpened(): logger.error("Could not open video: %s", video_path) return fps = cap.get(cv2.CAP_PROP_FPS) logger.info("Video FPS: %.2f", fps) # Test each transition results: List[Dict[str, Any]] = [] # Read timeout state at video start for baseline cap.set(cv2.CAP_PROP_POS_FRAMES, int(30 * fps)) # 30 seconds in ret, frame = cap.read() if ret: baseline = tracker.read_timeouts(frame) logger.info("Baseline timeouts at 30s: HOME=%d, AWAY=%d, conf=%.2f", baseline.home_timeouts, baseline.away_timeouts, baseline.confidence) print("\n" + "=" * 100) print("TIMEOUT DETECTION AT 40→25 TRANSITIONS") print("=" * 100) print(f"{'#':<4} {'Timestamp':<12} {'Before (H,A)':<14} {'After (H,A)':<14} {'Change':<12} {'GT Team':<10} {'Detected':<10} {'Status':<10}") print("-" * 100) for i, transition in enumerate(transitions, 1): timestamp = transition["timestamp"] prev_timestamp = transition.get("prev_timestamp", timestamp - 1.0) # Read frame BEFORE transition (when clock was 40) - 2 seconds before before_time = prev_timestamp - 2.0 frame_before = int(before_time * fps) cap.set(cv2.CAP_PROP_POS_FRAMES, frame_before) ret, frame = cap.read() if not ret: continue reading_before = tracker.read_timeouts(frame) # Read frame AFTER transition - try multiple times to catch delayed update # Check at 2s, 4s, and 6s after the transition reading_after = None for delay in [2.0, 4.0, 6.0]: after_time = timestamp + delay frame_after = int(after_time * fps) cap.set(cv2.CAP_PROP_POS_FRAMES, frame_after) ret, frame = cap.read() if not ret: continue reading_after = tracker.read_timeouts(frame) # If we detected a change, use this reading home_diff = reading_before.home_timeouts - reading_after.home_timeouts away_diff = reading_before.away_timeouts - reading_after.away_timeouts if home_diff > 0 or away_diff > 0: break # Found the change if reading_after is None: continue # Determine change home_change = reading_before.home_timeouts - reading_after.home_timeouts away_change = reading_before.away_timeouts - reading_after.away_timeouts # Validate: A valid timeout should show EXACTLY ONE team decreasing by 1 # while the other team stays the same. If both teams change, the scorebug # likely disappeared (e.g., replay/commercial) and this is not a real timeout. detected_team = None is_valid_timeout = False # Minimum confidence threshold for reliable readings MIN_CONFIDENCE = 0.5 # Check for valid timeout pattern: one team -1, other team unchanged # Also require high confidence on both readings if home_change == 1 and away_change == 0: if reading_before.confidence >= MIN_CONFIDENCE and reading_after.confidence >= MIN_CONFIDENCE: detected_team = "HOME" is_valid_timeout = True elif away_change == 1 and home_change == 0: if reading_before.confidence >= MIN_CONFIDENCE and reading_after.confidence >= MIN_CONFIDENCE: detected_team = "AWAY" is_valid_timeout = True elif home_change > 0 or away_change > 0: # Both teams changed, or change > 1 - likely scorebug visibility issue # Log but don't detect as timeout pass # Check ground truth gt_team = is_ground_truth_timeout(timestamp) # Determine status if gt_team is not None: if detected_team == gt_team: status = "✓ TP" # True positive elif detected_team is not None: status = "✗ WRONG" # Wrong team else: status = "✗ FN" # False negative else: if detected_team is not None: status = "✗ FP" # False positive else: status = "✓ TN" # True negative (correctly not detected) before_str = f"({reading_before.home_timeouts},{reading_before.away_timeouts})" after_str = f"({reading_after.home_timeouts},{reading_after.away_timeouts})" change_str = f"H:{home_change:+d} A:{away_change:+d}" print(f"{i:<4} {transition['timestamp_str']:<12} {before_str:<14} {after_str:<14} {change_str:<12} {gt_team or '-':<10} {detected_team or '-':<10} {status:<10}") results.append( { "index": i, "timestamp": timestamp, "timestamp_str": transition["timestamp_str"], "before_home": reading_before.home_timeouts, "before_away": reading_before.away_timeouts, "before_conf": reading_before.confidence, "after_home": reading_after.home_timeouts, "after_away": reading_after.away_timeouts, "after_conf": reading_after.confidence, "home_change": home_change, "away_change": away_change, "detected_team": detected_team, "ground_truth_team": gt_team, "status": status, } ) cap.release() # Summary statistics print("=" * 100) tp = sum(1 for r in results if "TP" in r["status"]) fp = sum(1 for r in results if "FP" in r["status"]) fn = sum(1 for r in results if "FN" in r["status"]) tn = sum(1 for r in results if "TN" in r["status"]) wrong = sum(1 for r in results if "WRONG" in r["status"]) print(f"\nSUMMARY:") print(f" True Positives (correct timeouts): {tp}") print(f" False Positives (spurious detections): {fp}") print(f" False Negatives (missed timeouts): {fn}") print(f" Wrong Team: {wrong}") print(f" True Negatives (correct non-timeout): {tn}") if tp + fn > 0: recall = tp / (tp + fn) print(f"\n Recall (of ground truth timeouts): {recall:.1%} ({tp}/{tp + fn})") if tp + fp > 0: precision = tp / (tp + fp) print(f" Precision (of detected timeouts): {precision:.1%} ({tp}/{tp + fp})") # Save detailed results output_path = Path("output/cache/timeout_tracker_evaluation.json") with open(output_path, "w", encoding="utf-8") as f: json.dump( { "summary": {"tp": tp, "fp": fp, "fn": fn, "tn": tn, "wrong": wrong}, "results": results, }, f, indent=2, ) print(f"\nDetailed results saved to: {output_path}") def visualize_timeout_regions(timestamp: float): """Visualize timeout regions at a specific timestamp for debugging.""" config_path = Path("data/config/timeout_tracker_region.json") tracker = DetectTimeouts(config_path=str(config_path)) video_path = "full_videos/OSU vs Tenn 12.21.24.mkv" cap = cv2.VideoCapture(video_path) fps = cap.get(cv2.CAP_PROP_FPS) frame_num = int(timestamp * fps) cap.set(cv2.CAP_PROP_POS_FRAMES, frame_num) ret, frame = cap.read() cap.release() if not ret: logger.error("Could not read frame at %.1fs", timestamp) return reading = tracker.read_timeouts(frame) vis_frame = tracker.visualize(frame, reading) output_path = f"output/debug/timeout_tracker/vis_{int(timestamp)}.png" Path(output_path).parent.mkdir(parents=True, exist_ok=True) cv2.imwrite(output_path, vis_frame) logger.info("Saved visualization to %s", output_path) logger.info(" HOME: %d, AWAY: %d, conf: %.2f", reading.home_timeouts, reading.away_timeouts, reading.confidence) if __name__ == "__main__": test_timeout_at_transitions() # Optionally visualize at specific timestamps # visualize_timeout_regions(265) # Before first timeout at 4:25 # visualize_timeout_regions(270) # After first timeout