""" Cache all play clock readings for the OSU vs Tenn video. This script: 1. Loads the template library 2. Reads play clock values from fixed regions for every frame (sampled at 0.5s) 3. Caches the results as JSON 4. Identifies all 40→25 transitions for timeout tracker testing Usage: python scripts/cache_playclock_readings.py """ import json import logging import sys import time from pathlib import Path from typing import Any, Dict, List, Optional, Tuple import cv2 import numpy as np # Add src to path sys.path.insert(0, str(Path(__file__).parent.parent / "src")) from readers import ReadPlayClock from setup import DigitTemplateLibrary logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s") logger = logging.getLogger(__name__) def seconds_to_timestamp(seconds: float) -> str: """Convert seconds to timestamp string (H:MM:SS).""" hours = int(seconds // 3600) minutes = int((seconds % 3600) // 60) secs = int(seconds % 60) if hours > 0: return f"{hours}:{minutes:02d}:{secs:02d}" return f"{minutes}:{secs:02d}" def cache_playclock_readings( video_path: str, template_dir: str, output_path: str, playclock_coords: Tuple[int, int, int, int], sample_interval: float = 0.5, start_time: float = 0.0, end_time: Optional[float] = None, ) -> Dict[str, Any]: """ Cache all play clock readings for a video. Args: video_path: Path to video file template_dir: Path to digit templates directory output_path: Path to save cached readings playclock_coords: (x, y, width, height) absolute coordinates sample_interval: Seconds between samples (default 0.5) start_time: Start time in seconds end_time: End time in seconds (None for full video) Returns: Dictionary with cached readings and transitions """ # Load template library logger.info("Loading template library from %s", template_dir) library = DigitTemplateLibrary() if not library.load(template_dir): raise RuntimeError(f"Failed to load templates from {template_dir}") coverage = library.get_coverage_status() logger.info("Template coverage: %d/%d (complete: %s)", coverage["total_have"], coverage["total_needed"], coverage["is_complete"]) # Create play clock reader reader = ReadPlayClock(library, region_width=playclock_coords[2], region_height=playclock_coords[3]) # Open video cap = cv2.VideoCapture(video_path) if not cap.isOpened(): raise RuntimeError(f"Failed to open video: {video_path}") fps = cap.get(cv2.CAP_PROP_FPS) total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) duration = total_frames / fps if end_time is None: end_time = duration logger.info("Video: %s", video_path) logger.info(" FPS: %.2f, Total frames: %d, Duration: %s", fps, total_frames, seconds_to_timestamp(duration)) logger.info(" Processing: %s to %s", seconds_to_timestamp(start_time), seconds_to_timestamp(end_time)) logger.info(" Play clock region: (%d, %d, %d, %d)", *playclock_coords) # Cache readings readings: List[Dict[str, Any]] = [] current_time = start_time frames_processed = 0 t_start = time.perf_counter() while current_time <= end_time: frame_num = int(current_time * fps) cap.set(cv2.CAP_PROP_POS_FRAMES, frame_num) ret, frame = cap.read() if not ret: current_time += sample_interval continue # Read play clock using fixed coordinates result = reader.read_from_fixed_location(frame, playclock_coords) reading_entry = { "timestamp": current_time, "frame_num": frame_num, "detected": result.detected, "value": result.value, "confidence": result.confidence, "method": result.method, } readings.append(reading_entry) frames_processed += 1 current_time += sample_interval # Progress log every 5 minutes of video if frames_processed % int(300 / sample_interval) == 0: elapsed = time.perf_counter() - t_start video_minutes = current_time / 60 logger.info(" Processed %d frames (%.1f min), elapsed: %.1fs", frames_processed, video_minutes, elapsed) cap.release() elapsed = time.perf_counter() - t_start logger.info("Processed %d frames in %.1fs (%.1f fps)", frames_processed, elapsed, frames_processed / elapsed) # Identify 40→25 transitions transitions = find_40_to_25_transitions(readings) logger.info("Found %d potential 40→25 transitions", len(transitions)) # Build result result = { "video": Path(video_path).name, "config": { "playclock_coords": list(playclock_coords), "sample_interval": sample_interval, "start_time": start_time, "end_time": end_time, }, "stats": { "total_readings": len(readings), "detected_readings": sum(1 for r in readings if r["detected"]), "processing_time": elapsed, }, "readings": readings, "transitions_40_to_25": transitions, } # Save to file output_file = Path(output_path) output_file.parent.mkdir(parents=True, exist_ok=True) with open(output_file, "w", encoding="utf-8") as f: json.dump(result, f, indent=2) logger.info("Saved cache to %s", output_path) return result def find_40_to_25_transitions(readings: List[Dict[str, Any]]) -> List[Dict[str, Any]]: """ Find all 40→25 clock transitions in the readings. A transition is identified when: 1. Previous reading was 40 (or no previous reading) 2. Current reading is 25 Args: readings: List of clock readings Returns: List of transition events with timestamps and context """ transitions = [] prev_value = None prev_timestamp = None for i, reading in enumerate(readings): if not reading["detected"] or reading["value"] is None: continue curr_value = reading["value"] curr_timestamp = reading["timestamp"] # Check for 40→25 transition if prev_value == 40 and curr_value == 25: transition = { "index": i, "timestamp": curr_timestamp, "timestamp_str": seconds_to_timestamp(curr_timestamp), "prev_timestamp": prev_timestamp, "prev_value": prev_value, "curr_value": curr_value, } transitions.append(transition) logger.debug("Found 40→25 transition at %s (index %d)", seconds_to_timestamp(curr_timestamp), i) prev_value = curr_value prev_timestamp = curr_timestamp return transitions def print_transitions_summary(cache_file: str) -> None: """Print a summary of transitions from a cache file.""" with open(cache_file, "r", encoding="utf-8") as f: data = json.load(f) transitions = data.get("transitions_40_to_25", []) print("\n" + "=" * 80) print("40→25 CLOCK TRANSITIONS") print("=" * 80) print(f"Total transitions found: {len(transitions)}") print("-" * 80) print(f"{'#':<4} {'Timestamp':<12} {'Prev→Curr':<12} {'Index':<8}") print("-" * 80) for i, t in enumerate(transitions, 1): print(f"{i:<4} {t['timestamp_str']:<12} {t['prev_value']}→{t['curr_value']:<8} {t['index']:<8}") print("=" * 80) def main(): """Main function to cache play clock readings.""" # Configuration video_path = "full_videos/OSU vs Tenn 12.21.24.mkv" template_dir = "output/debug/digit_templates" output_path = "output/cache/playclock_readings_full.json" # Play clock absolute coordinates (scorebug_x + offset_x, scorebug_y + offset_y, width, height) # From config: scorebug (131, 972), offset (899, 18), size (51, 28) playclock_coords = (131 + 899, 972 + 18, 51, 28) # Process full video result = cache_playclock_readings( video_path=video_path, template_dir=template_dir, output_path=output_path, playclock_coords=playclock_coords, sample_interval=0.5, # Match pipeline interval ) # Print summary print_transitions_summary(output_path) # Also print detection rate stats = result["stats"] detection_rate = stats["detected_readings"] / stats["total_readings"] * 100 print(f"\nDetection rate: {stats['detected_readings']}/{stats['total_readings']} ({detection_rate:.1f}%)") if __name__ == "__main__": main()