Spaces:
Sleeping
Sleeping
| """ | |
| Test the timeout tracker at each 40β25 transition. | |
| This script: | |
| 1. Loads the cached play clock readings with identified transitions | |
| 2. For each 40β25 transition, reads the timeout indicators BEFORE and AFTER | |
| 3. Compares the change in timeouts to determine if this was a timeout event | |
| 4. Compares results against ground truth | |
| Ground truth timeouts (Tennessee video): | |
| - 4:25 (HOME) -> transition at ~4:26 | |
| - 1:07:30 (AWAY) -> transition at ~1:07:24 | |
| - 1:09:40 (AWAY) -> transition at ~1:09:38 | |
| - 1:14:07 (HOME) -> transition at ~1:14:05 | |
| - 1:16:06 (HOME) -> transition at ~1:16:03 | |
| - 1:44:54 (AWAY) -> transition at ~1:44:48 | |
| Usage: | |
| # Default Tennessee video | |
| python scripts/test_timeout_at_transitions.py | |
| # Specific video | |
| python scripts/test_timeout_at_transitions.py --video "full_videos/OSU vs Texas 01.10.25.mkv" | |
| """ | |
| import argparse | |
| import json | |
| import logging | |
| import sys | |
| from pathlib import Path | |
| from typing import Any, Dict, List, Optional, Tuple | |
| import cv2 | |
| import numpy as np | |
| # Add src to path | |
| sys.path.insert(0, str(Path(__file__).parent.parent / "src")) | |
| from detection.timeouts import DetectTimeouts | |
| logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s") | |
| logger = logging.getLogger(__name__) | |
| PROJECT_ROOT = Path(__file__).parent.parent | |
| OUTPUT_DIR = PROJECT_ROOT / "output" | |
| def get_video_basename(video_path: str) -> str: | |
| """Get a clean basename from video path for config naming.""" | |
| basename = Path(video_path).stem | |
| for char in [" ", ".", "-"]: | |
| basename = basename.replace(char, "_") | |
| while "__" in basename: | |
| basename = basename.replace("__", "_") | |
| return basename.strip("_") | |
| # Ground truth timeouts (timestamp in seconds, team) | |
| GROUND_TRUTH_TIMEOUTS = [ | |
| (4 * 60 + 25, "HOME"), # 4:25 | |
| (67 * 60 + 30, "AWAY"), # 1:07:30 | |
| (69 * 60 + 40, "AWAY"), # 1:09:40 | |
| (74 * 60 + 7, "HOME"), # 1:14:07 | |
| (76 * 60 + 6, "HOME"), # 1:16:06 | |
| (104 * 60 + 54, "AWAY"), # 1:44:54 | |
| ] | |
| def seconds_to_timestamp(seconds: float) -> str: | |
| """Convert seconds to timestamp string (H:MM:SS).""" | |
| hours = int(seconds // 3600) | |
| minutes = int((seconds % 3600) // 60) | |
| secs = int(seconds % 60) | |
| if hours > 0: | |
| return f"{hours}:{minutes:02d}:{secs:02d}" | |
| return f"{minutes}:{secs:02d}" | |
| def is_ground_truth_timeout(timestamp: float, tolerance: float = 10.0) -> Optional[str]: | |
| """ | |
| Check if a timestamp corresponds to a ground truth timeout. | |
| Args: | |
| timestamp: Timestamp to check | |
| tolerance: Seconds tolerance for matching | |
| Returns: | |
| Team name ("HOME" or "AWAY") if this is a ground truth timeout, None otherwise | |
| """ | |
| for gt_time, team in GROUND_TRUTH_TIMEOUTS: | |
| if abs(timestamp - gt_time) <= tolerance: | |
| return team | |
| return None | |
| def test_timeout_at_transitions(): | |
| """Test timeout detection at each 40β25 transition.""" | |
| # Load cached transitions | |
| cache_path = Path("output/cache/playclock_readings_full.json") | |
| if not cache_path.exists(): | |
| logger.error("Cache file not found: %s", cache_path) | |
| logger.error("Run cache_playclock_readings.py first") | |
| return | |
| with open(cache_path, "r", encoding="utf-8") as f: | |
| cache = json.load(f) | |
| transitions = cache["transitions_40_to_25"] | |
| logger.info("Loaded %d transitions from cache", len(transitions)) | |
| # Parse arguments | |
| parser = argparse.ArgumentParser(description="Test timeout tracking at transitions") | |
| parser.add_argument("--video", type=str, default="full_videos/OSU vs Tenn 12.21.24.mkv", help="Path to video file") | |
| args = parser.parse_args() | |
| video_path = args.video | |
| video_basename = get_video_basename(video_path) | |
| # Try video-specific timeout config first | |
| config_path = OUTPUT_DIR / f"{video_basename}_timeout_config.json" | |
| if not config_path.exists(): | |
| # Fall back to generic config | |
| config_path = Path("data/config/timeout_tracker_region.json") | |
| if not config_path.exists(): | |
| logger.error("Timeout config not found: %s", config_path) | |
| logger.error("Try running main.py first to generate timeout config for this video") | |
| return | |
| logger.info("Using timeout config: %s", config_path) | |
| # Initialize timeout tracker | |
| tracker = DetectTimeouts(config_path=str(config_path)) | |
| if not tracker.is_configured(): | |
| logger.error("Timeout tracker not configured") | |
| return | |
| # Open video | |
| cap = cv2.VideoCapture(video_path) | |
| if not cap.isOpened(): | |
| logger.error("Could not open video: %s", video_path) | |
| return | |
| fps = cap.get(cv2.CAP_PROP_FPS) | |
| logger.info("Video FPS: %.2f", fps) | |
| # Test each transition | |
| results: List[Dict[str, Any]] = [] | |
| # Read timeout state at video start for baseline | |
| cap.set(cv2.CAP_PROP_POS_FRAMES, int(30 * fps)) # 30 seconds in | |
| ret, frame = cap.read() | |
| if ret: | |
| baseline = tracker.read_timeouts(frame) | |
| logger.info("Baseline timeouts at 30s: HOME=%d, AWAY=%d, conf=%.2f", baseline.home_timeouts, baseline.away_timeouts, baseline.confidence) | |
| print("\n" + "=" * 100) | |
| print("TIMEOUT DETECTION AT 40β25 TRANSITIONS") | |
| print("=" * 100) | |
| print(f"{'#':<4} {'Timestamp':<12} {'Before (H,A)':<14} {'After (H,A)':<14} {'Change':<12} {'GT Team':<10} {'Detected':<10} {'Status':<10}") | |
| print("-" * 100) | |
| for i, transition in enumerate(transitions, 1): | |
| timestamp = transition["timestamp"] | |
| prev_timestamp = transition.get("prev_timestamp", timestamp - 1.0) | |
| # Read frame BEFORE transition (when clock was 40) - 2 seconds before | |
| before_time = prev_timestamp - 2.0 | |
| frame_before = int(before_time * fps) | |
| cap.set(cv2.CAP_PROP_POS_FRAMES, frame_before) | |
| ret, frame = cap.read() | |
| if not ret: | |
| continue | |
| reading_before = tracker.read_timeouts(frame) | |
| # Read frame AFTER transition - try multiple times to catch delayed update | |
| # Check at 2s, 4s, and 6s after the transition | |
| reading_after = None | |
| for delay in [2.0, 4.0, 6.0]: | |
| after_time = timestamp + delay | |
| frame_after = int(after_time * fps) | |
| cap.set(cv2.CAP_PROP_POS_FRAMES, frame_after) | |
| ret, frame = cap.read() | |
| if not ret: | |
| continue | |
| reading_after = tracker.read_timeouts(frame) | |
| # If we detected a change, use this reading | |
| home_diff = reading_before.home_timeouts - reading_after.home_timeouts | |
| away_diff = reading_before.away_timeouts - reading_after.away_timeouts | |
| if home_diff > 0 or away_diff > 0: | |
| break # Found the change | |
| if reading_after is None: | |
| continue | |
| # Determine change | |
| home_change = reading_before.home_timeouts - reading_after.home_timeouts | |
| away_change = reading_before.away_timeouts - reading_after.away_timeouts | |
| # Validate: A valid timeout should show EXACTLY ONE team decreasing by 1 | |
| # while the other team stays the same. If both teams change, the scorebug | |
| # likely disappeared (e.g., replay/commercial) and this is not a real timeout. | |
| detected_team = None | |
| is_valid_timeout = False | |
| # Minimum confidence threshold for reliable readings | |
| MIN_CONFIDENCE = 0.5 | |
| # Check for valid timeout pattern: one team -1, other team unchanged | |
| # Also require high confidence on both readings | |
| if home_change == 1 and away_change == 0: | |
| if reading_before.confidence >= MIN_CONFIDENCE and reading_after.confidence >= MIN_CONFIDENCE: | |
| detected_team = "HOME" | |
| is_valid_timeout = True | |
| elif away_change == 1 and home_change == 0: | |
| if reading_before.confidence >= MIN_CONFIDENCE and reading_after.confidence >= MIN_CONFIDENCE: | |
| detected_team = "AWAY" | |
| is_valid_timeout = True | |
| elif home_change > 0 or away_change > 0: | |
| # Both teams changed, or change > 1 - likely scorebug visibility issue | |
| # Log but don't detect as timeout | |
| pass | |
| # Check ground truth | |
| gt_team = is_ground_truth_timeout(timestamp) | |
| # Determine status | |
| if gt_team is not None: | |
| if detected_team == gt_team: | |
| status = "β TP" # True positive | |
| elif detected_team is not None: | |
| status = "β WRONG" # Wrong team | |
| else: | |
| status = "β FN" # False negative | |
| else: | |
| if detected_team is not None: | |
| status = "β FP" # False positive | |
| else: | |
| status = "β TN" # True negative (correctly not detected) | |
| before_str = f"({reading_before.home_timeouts},{reading_before.away_timeouts})" | |
| after_str = f"({reading_after.home_timeouts},{reading_after.away_timeouts})" | |
| change_str = f"H:{home_change:+d} A:{away_change:+d}" | |
| print(f"{i:<4} {transition['timestamp_str']:<12} {before_str:<14} {after_str:<14} {change_str:<12} {gt_team or '-':<10} {detected_team or '-':<10} {status:<10}") | |
| results.append( | |
| { | |
| "index": i, | |
| "timestamp": timestamp, | |
| "timestamp_str": transition["timestamp_str"], | |
| "before_home": reading_before.home_timeouts, | |
| "before_away": reading_before.away_timeouts, | |
| "before_conf": reading_before.confidence, | |
| "after_home": reading_after.home_timeouts, | |
| "after_away": reading_after.away_timeouts, | |
| "after_conf": reading_after.confidence, | |
| "home_change": home_change, | |
| "away_change": away_change, | |
| "detected_team": detected_team, | |
| "ground_truth_team": gt_team, | |
| "status": status, | |
| } | |
| ) | |
| cap.release() | |
| # Summary statistics | |
| print("=" * 100) | |
| tp = sum(1 for r in results if "TP" in r["status"]) | |
| fp = sum(1 for r in results if "FP" in r["status"]) | |
| fn = sum(1 for r in results if "FN" in r["status"]) | |
| tn = sum(1 for r in results if "TN" in r["status"]) | |
| wrong = sum(1 for r in results if "WRONG" in r["status"]) | |
| print(f"\nSUMMARY:") | |
| print(f" True Positives (correct timeouts): {tp}") | |
| print(f" False Positives (spurious detections): {fp}") | |
| print(f" False Negatives (missed timeouts): {fn}") | |
| print(f" Wrong Team: {wrong}") | |
| print(f" True Negatives (correct non-timeout): {tn}") | |
| if tp + fn > 0: | |
| recall = tp / (tp + fn) | |
| print(f"\n Recall (of ground truth timeouts): {recall:.1%} ({tp}/{tp + fn})") | |
| if tp + fp > 0: | |
| precision = tp / (tp + fp) | |
| print(f" Precision (of detected timeouts): {precision:.1%} ({tp}/{tp + fp})") | |
| # Save detailed results | |
| output_path = Path("output/cache/timeout_tracker_evaluation.json") | |
| with open(output_path, "w", encoding="utf-8") as f: | |
| json.dump( | |
| { | |
| "summary": {"tp": tp, "fp": fp, "fn": fn, "tn": tn, "wrong": wrong}, | |
| "results": results, | |
| }, | |
| f, | |
| indent=2, | |
| ) | |
| print(f"\nDetailed results saved to: {output_path}") | |
| def visualize_timeout_regions(timestamp: float): | |
| """Visualize timeout regions at a specific timestamp for debugging.""" | |
| config_path = Path("data/config/timeout_tracker_region.json") | |
| tracker = DetectTimeouts(config_path=str(config_path)) | |
| video_path = "full_videos/OSU vs Tenn 12.21.24.mkv" | |
| cap = cv2.VideoCapture(video_path) | |
| fps = cap.get(cv2.CAP_PROP_FPS) | |
| frame_num = int(timestamp * fps) | |
| cap.set(cv2.CAP_PROP_POS_FRAMES, frame_num) | |
| ret, frame = cap.read() | |
| cap.release() | |
| if not ret: | |
| logger.error("Could not read frame at %.1fs", timestamp) | |
| return | |
| reading = tracker.read_timeouts(frame) | |
| vis_frame = tracker.visualize(frame, reading) | |
| output_path = f"output/debug/timeout_tracker/vis_{int(timestamp)}.png" | |
| Path(output_path).parent.mkdir(parents=True, exist_ok=True) | |
| cv2.imwrite(output_path, vis_frame) | |
| logger.info("Saved visualization to %s", output_path) | |
| logger.info(" HOME: %d, AWAY: %d, conf: %.2f", reading.home_timeouts, reading.away_timeouts, reading.confidence) | |
| if __name__ == "__main__": | |
| test_timeout_at_transitions() | |
| # Optionally visualize at specific timestamps | |
| # visualize_timeout_regions(265) # Before first timeout at 4:25 | |
| # visualize_timeout_regions(270) # After first timeout | |