cfb40 / scripts /diagnose_oregon_gameplay.py
andytaylor-smg's picture
I think this fixes Oregon
aee009f
"""
Diagnose play clock reading during actual gameplay on the OSU vs Oregon video.
This script specifically targets gameplay portions of the video, starting from
around when the first play was detected (333s).
Usage:
python scripts/diagnose_oregon_gameplay.py
"""
import json
import logging
import sys
from collections import Counter
from pathlib import Path
from typing import Any, Dict, List, Tuple
import cv2
import numpy as np
# Add src to path
sys.path.insert(0, str(Path(__file__).parent.parent / "src"))
from readers import ReadPlayClock
from setup import DigitTemplateLibrary
from utils.regions import preprocess_playclock_region
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")
logger = logging.getLogger(__name__)
def seconds_to_timestamp(seconds: float) -> str:
"""Convert seconds to timestamp string (H:MM:SS)."""
hours = int(seconds // 3600)
minutes = int((seconds % 3600) // 60)
secs = int(seconds % 60)
if hours > 0:
return f"{hours}:{minutes:02d}:{secs:02d}"
return f"{minutes}:{secs:02d}"
def load_oregon_config() -> Dict[str, Any]:
"""Load the saved config for Oregon video."""
config_path = Path("output/OSU_vs_Oregon_01_01_25_config.json")
with open(config_path, "r") as f:
return json.load(f)
def diagnose_gameplay_segment(
video_path: str,
playclock_coords: Tuple[int, int, int, int],
scorebug_coords: Tuple[int, int, int, int],
template_dir: str,
output_dir: str,
start_time: float = 330.0, # Start around first detected play
duration: float = 120.0, # 2 minutes of gameplay
sample_interval: float = 0.5,
) -> Dict[str, Any]:
"""
Diagnose play clock reading during a specific gameplay segment.
"""
# Create output directory
output_path = Path(output_dir)
output_path.mkdir(parents=True, exist_ok=True)
# Load template library
logger.info("Loading template library from %s", template_dir)
library = DigitTemplateLibrary()
if not library.load(template_dir):
raise RuntimeError(f"Failed to load templates from {template_dir}")
coverage = library.get_coverage_status()
logger.info("Template coverage: %d/%d", coverage["total_have"], coverage["total_needed"])
# Create play clock reader
pc_w, pc_h = playclock_coords[2], playclock_coords[3]
reader = ReadPlayClock(library, region_width=pc_w, region_height=pc_h)
# Open video
cap = cv2.VideoCapture(video_path)
if not cap.isOpened():
raise RuntimeError(f"Failed to open video: {video_path}")
fps = cap.get(cv2.CAP_PROP_FPS)
logger.info("Video: %s", video_path)
logger.info(" Analyzing: %s - %s", seconds_to_timestamp(start_time), seconds_to_timestamp(start_time + duration))
logger.info(" Play clock region: (%d, %d, %d, %d)", *playclock_coords)
# Sample frames
readings = []
detected_count = 0
value_counts: Counter = Counter()
current_time = start_time
end_time = start_time + duration
sample_idx = 0
while current_time < end_time:
frame_num = int(current_time * fps)
cap.set(cv2.CAP_PROP_POS_FRAMES, frame_num)
ret, frame = cap.read()
if not ret or frame is None:
current_time += sample_interval
continue
# Read play clock
result = reader.read_from_fixed_location(frame, playclock_coords)
reading = {
"timestamp": current_time,
"detected": result.detected,
"value": result.value,
"confidence": result.confidence,
"tens_conf": result.tens_match.confidence if result.tens_match else None,
"ones_conf": result.ones_match.confidence if result.ones_match else None,
}
readings.append(reading)
if result.detected:
detected_count += 1
value_counts[result.value] += 1
# Save debug images every 5 samples
if sample_idx % 10 == 0:
save_gameplay_debug(
frame, playclock_coords, scorebug_coords, result, current_time, output_path / f"gameplay_{sample_idx:04d}_t{int(current_time)}.png", reader.scale_factor
)
current_time += sample_interval
sample_idx += 1
cap.release()
# Compute statistics
detection_rate = detected_count / len(readings) * 100 if readings else 0
# Analyze consecutive patterns
consecutive_failures = analyze_consecutive_failures(readings)
logger.info("")
logger.info("=" * 60)
logger.info("GAMEPLAY SEGMENT ANALYSIS (%s - %s)", seconds_to_timestamp(start_time), seconds_to_timestamp(end_time))
logger.info("=" * 60)
logger.info("Detection rate: %d/%d (%.1f%%)", detected_count, len(readings), detection_rate)
logger.info("")
logger.info("Value distribution:")
for value in sorted(value_counts.keys()):
count = value_counts[value]
logger.info(" Clock %d: %d readings (%.1f%%)", value, count, count / detected_count * 100 if detected_count else 0)
logger.info("")
logger.info("Consecutive failures analysis:")
logger.info(" Longest streak without detection: %d samples (%.1fs)", consecutive_failures["max_streak"], consecutive_failures["max_streak"] * sample_interval)
logger.info(" Failure streaks >= 5 samples: %d", consecutive_failures["long_streaks"])
# Save results
results = {
"segment": {"start": start_time, "end": end_time},
"stats": {
"total_samples": len(readings),
"detected": detected_count,
"detection_rate": detection_rate,
},
"value_distribution": dict(value_counts),
"consecutive_failures": consecutive_failures,
"readings": readings,
}
results_path = output_path / "gameplay_analysis.json"
with open(results_path, "w") as f:
json.dump(results, f, indent=2)
logger.info("")
logger.info("Results saved to %s", results_path)
return results
def analyze_consecutive_failures(readings: List[Dict]) -> Dict[str, Any]:
"""Analyze patterns of consecutive detection failures."""
current_streak = 0
max_streak = 0
long_streaks = 0 # Streaks >= 5
for r in readings:
if not r["detected"]:
current_streak += 1
if current_streak > max_streak:
max_streak = current_streak
else:
if current_streak >= 5:
long_streaks += 1
current_streak = 0
# Final streak
if current_streak >= 5:
long_streaks += 1
return {
"max_streak": max_streak,
"long_streaks": long_streaks,
}
def save_gameplay_debug(
frame: np.ndarray,
playclock_coords: Tuple[int, int, int, int],
scorebug_coords: Tuple[int, int, int, int],
result,
timestamp: float,
output_path: Path,
scale_factor: int = 4,
) -> None:
"""Save a debug image for gameplay analysis."""
pc_x, pc_y, pc_w, pc_h = playclock_coords
sb_x, sb_y, sb_w, sb_h = scorebug_coords
# Extract the scorebug area plus some context
margin = 20
crop_y1 = max(0, sb_y - margin)
crop_y2 = min(frame.shape[0], sb_y + sb_h + margin)
crop_x1 = max(0, sb_x - margin)
crop_x2 = min(frame.shape[1], sb_x + sb_w + margin)
scorebug_crop = frame[crop_y1:crop_y2, crop_x1:crop_x2].copy()
# Draw play clock region on crop (adjust coordinates)
pc_rel_x = pc_x - crop_x1
pc_rel_y = pc_y - crop_y1
cv2.rectangle(scorebug_crop, (pc_rel_x, pc_rel_y), (pc_rel_x + pc_w, pc_rel_y + pc_h), (0, 0, 255), 2)
# Extract play clock region for detail view
playclock_region = frame[pc_y : pc_y + pc_h, pc_x : pc_x + pc_w].copy()
pc_scaled = cv2.resize(playclock_region, None, fx=8, fy=8, interpolation=cv2.INTER_NEAREST)
# Preprocess for visualization
preprocessed = preprocess_playclock_region(playclock_region, scale_factor)
preprocessed_bgr = cv2.cvtColor(preprocessed, cv2.COLOR_GRAY2BGR)
preprocessed_scaled = cv2.resize(preprocessed_bgr, (pc_scaled.shape[1], pc_scaled.shape[0]), interpolation=cv2.INTER_NEAREST)
# Scale up scorebug crop
scorebug_scaled = cv2.resize(scorebug_crop, None, fx=2, fy=2)
# Create composite
composite_height = max(scorebug_scaled.shape[0], pc_scaled.shape[0] * 2 + 10)
composite_width = scorebug_scaled.shape[1] + max(pc_scaled.shape[1], preprocessed_scaled.shape[1]) + 30
composite = np.zeros((composite_height + 40, composite_width, 3), dtype=np.uint8)
# Place scorebug
composite[0 : scorebug_scaled.shape[0], 0 : scorebug_scaled.shape[1]] = scorebug_scaled
# Place original playclock (scaled)
x_offset = scorebug_scaled.shape[1] + 20
composite[0 : pc_scaled.shape[0], x_offset : x_offset + pc_scaled.shape[1]] = pc_scaled
# Place preprocessed below
y_offset = pc_scaled.shape[0] + 10
composite[y_offset : y_offset + preprocessed_scaled.shape[0], x_offset : x_offset + preprocessed_scaled.shape[1]] = preprocessed_scaled
# Add text
font = cv2.FONT_HERSHEY_SIMPLEX
if result.detected:
text = f"t={seconds_to_timestamp(timestamp)} | Clock: {result.value} | Conf: {result.confidence:.2f}"
color = (0, 255, 0)
else:
text = f"t={seconds_to_timestamp(timestamp)} | NOT DETECTED | Conf: {result.confidence:.2f}"
color = (0, 0, 255)
cv2.putText(composite, text, (10, composite_height + 25), font, 0.5, color, 1)
cv2.imwrite(str(output_path), composite)
def main():
"""Main function to analyze Oregon gameplay."""
# Load Oregon config
config = load_oregon_config()
video_path = config["video_path"]
# Calculate absolute play clock coordinates
playclock_coords = (
config["scorebug_x"] + config["playclock_x_offset"],
config["scorebug_y"] + config["playclock_y_offset"],
config["playclock_width"],
config["playclock_height"],
)
scorebug_coords = (
config["scorebug_x"],
config["scorebug_y"],
config["scorebug_width"],
config["scorebug_height"],
)
# Analyze first gameplay segment (around first detected play at 333.3s)
logger.info("Analyzing first gameplay segment...")
results1 = diagnose_gameplay_segment(
video_path=video_path,
playclock_coords=playclock_coords,
scorebug_coords=scorebug_coords,
template_dir="output/debug/digit_templates",
output_dir="output/debug/oregon_gameplay_analysis",
start_time=330.0,
duration=180.0, # 3 minutes
)
# Compare: Analyze a later segment
logger.info("")
logger.info("=" * 60)
logger.info("Analyzing later gameplay segment for comparison...")
logger.info("=" * 60)
results2 = diagnose_gameplay_segment(
video_path=video_path,
playclock_coords=playclock_coords,
scorebug_coords=scorebug_coords,
template_dir="output/debug/digit_templates",
output_dir="output/debug/oregon_gameplay_later",
start_time=1200.0, # 20 minutes in
duration=180.0, # 3 minutes
)
# Summary comparison
logger.info("")
logger.info("=" * 60)
logger.info("COMPARISON SUMMARY")
logger.info("=" * 60)
logger.info("Early game (5:30-8:30): %.1f%% detection rate", results1["stats"]["detection_rate"])
logger.info("Later game (20:00-23:00): %.1f%% detection rate", results2["stats"]["detection_rate"])
if __name__ == "__main__":
main()