cfb40 / scripts /cache_playclock_readings.py
andytaylor-smg's picture
timeout works now
eecfaf7
"""
Cache all play clock readings for the OSU vs Tenn video.
This script:
1. Loads the template library
2. Reads play clock values from fixed regions for every frame (sampled at 0.5s)
3. Caches the results as JSON
4. Identifies all 40→25 transitions for timeout tracker testing
Usage:
python scripts/cache_playclock_readings.py
"""
import json
import logging
import sys
import time
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple
import cv2
import numpy as np
# Add src to path
sys.path.insert(0, str(Path(__file__).parent.parent / "src"))
from readers import ReadPlayClock
from setup import DigitTemplateLibrary
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")
logger = logging.getLogger(__name__)
def seconds_to_timestamp(seconds: float) -> str:
"""Convert seconds to timestamp string (H:MM:SS)."""
hours = int(seconds // 3600)
minutes = int((seconds % 3600) // 60)
secs = int(seconds % 60)
if hours > 0:
return f"{hours}:{minutes:02d}:{secs:02d}"
return f"{minutes}:{secs:02d}"
def cache_playclock_readings(
video_path: str,
template_dir: str,
output_path: str,
playclock_coords: Tuple[int, int, int, int],
sample_interval: float = 0.5,
start_time: float = 0.0,
end_time: Optional[float] = None,
) -> Dict[str, Any]:
"""
Cache all play clock readings for a video.
Args:
video_path: Path to video file
template_dir: Path to digit templates directory
output_path: Path to save cached readings
playclock_coords: (x, y, width, height) absolute coordinates
sample_interval: Seconds between samples (default 0.5)
start_time: Start time in seconds
end_time: End time in seconds (None for full video)
Returns:
Dictionary with cached readings and transitions
"""
# Load template library
logger.info("Loading template library from %s", template_dir)
library = DigitTemplateLibrary()
if not library.load(template_dir):
raise RuntimeError(f"Failed to load templates from {template_dir}")
coverage = library.get_coverage_status()
logger.info("Template coverage: %d/%d (complete: %s)", coverage["total_have"], coverage["total_needed"], coverage["is_complete"])
# Create play clock reader
reader = ReadPlayClock(library, region_width=playclock_coords[2], region_height=playclock_coords[3])
# Open video
cap = cv2.VideoCapture(video_path)
if not cap.isOpened():
raise RuntimeError(f"Failed to open video: {video_path}")
fps = cap.get(cv2.CAP_PROP_FPS)
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
duration = total_frames / fps
if end_time is None:
end_time = duration
logger.info("Video: %s", video_path)
logger.info(" FPS: %.2f, Total frames: %d, Duration: %s", fps, total_frames, seconds_to_timestamp(duration))
logger.info(" Processing: %s to %s", seconds_to_timestamp(start_time), seconds_to_timestamp(end_time))
logger.info(" Play clock region: (%d, %d, %d, %d)", *playclock_coords)
# Cache readings
readings: List[Dict[str, Any]] = []
current_time = start_time
frames_processed = 0
t_start = time.perf_counter()
while current_time <= end_time:
frame_num = int(current_time * fps)
cap.set(cv2.CAP_PROP_POS_FRAMES, frame_num)
ret, frame = cap.read()
if not ret:
current_time += sample_interval
continue
# Read play clock using fixed coordinates
result = reader.read_from_fixed_location(frame, playclock_coords)
reading_entry = {
"timestamp": current_time,
"frame_num": frame_num,
"detected": result.detected,
"value": result.value,
"confidence": result.confidence,
"method": result.method,
}
readings.append(reading_entry)
frames_processed += 1
current_time += sample_interval
# Progress log every 5 minutes of video
if frames_processed % int(300 / sample_interval) == 0:
elapsed = time.perf_counter() - t_start
video_minutes = current_time / 60
logger.info(" Processed %d frames (%.1f min), elapsed: %.1fs", frames_processed, video_minutes, elapsed)
cap.release()
elapsed = time.perf_counter() - t_start
logger.info("Processed %d frames in %.1fs (%.1f fps)", frames_processed, elapsed, frames_processed / elapsed)
# Identify 40→25 transitions
transitions = find_40_to_25_transitions(readings)
logger.info("Found %d potential 40→25 transitions", len(transitions))
# Build result
result = {
"video": Path(video_path).name,
"config": {
"playclock_coords": list(playclock_coords),
"sample_interval": sample_interval,
"start_time": start_time,
"end_time": end_time,
},
"stats": {
"total_readings": len(readings),
"detected_readings": sum(1 for r in readings if r["detected"]),
"processing_time": elapsed,
},
"readings": readings,
"transitions_40_to_25": transitions,
}
# Save to file
output_file = Path(output_path)
output_file.parent.mkdir(parents=True, exist_ok=True)
with open(output_file, "w", encoding="utf-8") as f:
json.dump(result, f, indent=2)
logger.info("Saved cache to %s", output_path)
return result
def find_40_to_25_transitions(readings: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""
Find all 40→25 clock transitions in the readings.
A transition is identified when:
1. Previous reading was 40 (or no previous reading)
2. Current reading is 25
Args:
readings: List of clock readings
Returns:
List of transition events with timestamps and context
"""
transitions = []
prev_value = None
prev_timestamp = None
for i, reading in enumerate(readings):
if not reading["detected"] or reading["value"] is None:
continue
curr_value = reading["value"]
curr_timestamp = reading["timestamp"]
# Check for 40→25 transition
if prev_value == 40 and curr_value == 25:
transition = {
"index": i,
"timestamp": curr_timestamp,
"timestamp_str": seconds_to_timestamp(curr_timestamp),
"prev_timestamp": prev_timestamp,
"prev_value": prev_value,
"curr_value": curr_value,
}
transitions.append(transition)
logger.debug("Found 40→25 transition at %s (index %d)", seconds_to_timestamp(curr_timestamp), i)
prev_value = curr_value
prev_timestamp = curr_timestamp
return transitions
def print_transitions_summary(cache_file: str) -> None:
"""Print a summary of transitions from a cache file."""
with open(cache_file, "r", encoding="utf-8") as f:
data = json.load(f)
transitions = data.get("transitions_40_to_25", [])
print("\n" + "=" * 80)
print("40→25 CLOCK TRANSITIONS")
print("=" * 80)
print(f"Total transitions found: {len(transitions)}")
print("-" * 80)
print(f"{'#':<4} {'Timestamp':<12} {'Prev→Curr':<12} {'Index':<8}")
print("-" * 80)
for i, t in enumerate(transitions, 1):
print(f"{i:<4} {t['timestamp_str']:<12} {t['prev_value']}{t['curr_value']:<8} {t['index']:<8}")
print("=" * 80)
def main():
"""Main function to cache play clock readings."""
# Configuration
video_path = "full_videos/OSU vs Tenn 12.21.24.mkv"
template_dir = "output/debug/digit_templates"
output_path = "output/cache/playclock_readings_full.json"
# Play clock absolute coordinates (scorebug_x + offset_x, scorebug_y + offset_y, width, height)
# From config: scorebug (131, 972), offset (899, 18), size (51, 28)
playclock_coords = (131 + 899, 972 + 18, 51, 28)
# Process full video
result = cache_playclock_readings(
video_path=video_path,
template_dir=template_dir,
output_path=output_path,
playclock_coords=playclock_coords,
sample_interval=0.5, # Match pipeline interval
)
# Print summary
print_transitions_summary(output_path)
# Also print detection rate
stats = result["stats"]
detection_rate = stats["detected_readings"] / stats["total_readings"] * 100
print(f"\nDetection rate: {stats['detected_readings']}/{stats['total_readings']} ({detection_rate:.1f}%)")
if __name__ == "__main__":
main()