Spaces:
Sleeping
Sleeping
| """ | |
| Diagnose play clock reading during actual gameplay on the OSU vs Oregon video. | |
| This script specifically targets gameplay portions of the video, starting from | |
| around when the first play was detected (333s). | |
| Usage: | |
| python scripts/diagnose_oregon_gameplay.py | |
| """ | |
| import json | |
| import logging | |
| import sys | |
| from collections import Counter | |
| from pathlib import Path | |
| from typing import Any, Dict, List, Tuple | |
| import cv2 | |
| import numpy as np | |
| # Add src to path | |
| sys.path.insert(0, str(Path(__file__).parent.parent / "src")) | |
| from readers import ReadPlayClock | |
| from setup import DigitTemplateLibrary | |
| from utils.regions import preprocess_playclock_region | |
| logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s") | |
| logger = logging.getLogger(__name__) | |
| def seconds_to_timestamp(seconds: float) -> str: | |
| """Convert seconds to timestamp string (H:MM:SS).""" | |
| hours = int(seconds // 3600) | |
| minutes = int((seconds % 3600) // 60) | |
| secs = int(seconds % 60) | |
| if hours > 0: | |
| return f"{hours}:{minutes:02d}:{secs:02d}" | |
| return f"{minutes}:{secs:02d}" | |
| def load_oregon_config() -> Dict[str, Any]: | |
| """Load the saved config for Oregon video.""" | |
| config_path = Path("output/OSU_vs_Oregon_01_01_25_config.json") | |
| with open(config_path, "r") as f: | |
| return json.load(f) | |
| def diagnose_gameplay_segment( | |
| video_path: str, | |
| playclock_coords: Tuple[int, int, int, int], | |
| scorebug_coords: Tuple[int, int, int, int], | |
| template_dir: str, | |
| output_dir: str, | |
| start_time: float = 330.0, # Start around first detected play | |
| duration: float = 120.0, # 2 minutes of gameplay | |
| sample_interval: float = 0.5, | |
| ) -> Dict[str, Any]: | |
| """ | |
| Diagnose play clock reading during a specific gameplay segment. | |
| """ | |
| # Create output directory | |
| output_path = Path(output_dir) | |
| output_path.mkdir(parents=True, exist_ok=True) | |
| # Load template library | |
| logger.info("Loading template library from %s", template_dir) | |
| library = DigitTemplateLibrary() | |
| if not library.load(template_dir): | |
| raise RuntimeError(f"Failed to load templates from {template_dir}") | |
| coverage = library.get_coverage_status() | |
| logger.info("Template coverage: %d/%d", coverage["total_have"], coverage["total_needed"]) | |
| # Create play clock reader | |
| pc_w, pc_h = playclock_coords[2], playclock_coords[3] | |
| reader = ReadPlayClock(library, region_width=pc_w, region_height=pc_h) | |
| # Open video | |
| cap = cv2.VideoCapture(video_path) | |
| if not cap.isOpened(): | |
| raise RuntimeError(f"Failed to open video: {video_path}") | |
| fps = cap.get(cv2.CAP_PROP_FPS) | |
| logger.info("Video: %s", video_path) | |
| logger.info(" Analyzing: %s - %s", seconds_to_timestamp(start_time), seconds_to_timestamp(start_time + duration)) | |
| logger.info(" Play clock region: (%d, %d, %d, %d)", *playclock_coords) | |
| # Sample frames | |
| readings = [] | |
| detected_count = 0 | |
| value_counts: Counter = Counter() | |
| current_time = start_time | |
| end_time = start_time + duration | |
| sample_idx = 0 | |
| while current_time < end_time: | |
| frame_num = int(current_time * fps) | |
| cap.set(cv2.CAP_PROP_POS_FRAMES, frame_num) | |
| ret, frame = cap.read() | |
| if not ret or frame is None: | |
| current_time += sample_interval | |
| continue | |
| # Read play clock | |
| result = reader.read_from_fixed_location(frame, playclock_coords) | |
| reading = { | |
| "timestamp": current_time, | |
| "detected": result.detected, | |
| "value": result.value, | |
| "confidence": result.confidence, | |
| "tens_conf": result.tens_match.confidence if result.tens_match else None, | |
| "ones_conf": result.ones_match.confidence if result.ones_match else None, | |
| } | |
| readings.append(reading) | |
| if result.detected: | |
| detected_count += 1 | |
| value_counts[result.value] += 1 | |
| # Save debug images every 5 samples | |
| if sample_idx % 10 == 0: | |
| save_gameplay_debug( | |
| frame, playclock_coords, scorebug_coords, result, current_time, output_path / f"gameplay_{sample_idx:04d}_t{int(current_time)}.png", reader.scale_factor | |
| ) | |
| current_time += sample_interval | |
| sample_idx += 1 | |
| cap.release() | |
| # Compute statistics | |
| detection_rate = detected_count / len(readings) * 100 if readings else 0 | |
| # Analyze consecutive patterns | |
| consecutive_failures = analyze_consecutive_failures(readings) | |
| logger.info("") | |
| logger.info("=" * 60) | |
| logger.info("GAMEPLAY SEGMENT ANALYSIS (%s - %s)", seconds_to_timestamp(start_time), seconds_to_timestamp(end_time)) | |
| logger.info("=" * 60) | |
| logger.info("Detection rate: %d/%d (%.1f%%)", detected_count, len(readings), detection_rate) | |
| logger.info("") | |
| logger.info("Value distribution:") | |
| for value in sorted(value_counts.keys()): | |
| count = value_counts[value] | |
| logger.info(" Clock %d: %d readings (%.1f%%)", value, count, count / detected_count * 100 if detected_count else 0) | |
| logger.info("") | |
| logger.info("Consecutive failures analysis:") | |
| logger.info(" Longest streak without detection: %d samples (%.1fs)", consecutive_failures["max_streak"], consecutive_failures["max_streak"] * sample_interval) | |
| logger.info(" Failure streaks >= 5 samples: %d", consecutive_failures["long_streaks"]) | |
| # Save results | |
| results = { | |
| "segment": {"start": start_time, "end": end_time}, | |
| "stats": { | |
| "total_samples": len(readings), | |
| "detected": detected_count, | |
| "detection_rate": detection_rate, | |
| }, | |
| "value_distribution": dict(value_counts), | |
| "consecutive_failures": consecutive_failures, | |
| "readings": readings, | |
| } | |
| results_path = output_path / "gameplay_analysis.json" | |
| with open(results_path, "w") as f: | |
| json.dump(results, f, indent=2) | |
| logger.info("") | |
| logger.info("Results saved to %s", results_path) | |
| return results | |
| def analyze_consecutive_failures(readings: List[Dict]) -> Dict[str, Any]: | |
| """Analyze patterns of consecutive detection failures.""" | |
| current_streak = 0 | |
| max_streak = 0 | |
| long_streaks = 0 # Streaks >= 5 | |
| for r in readings: | |
| if not r["detected"]: | |
| current_streak += 1 | |
| if current_streak > max_streak: | |
| max_streak = current_streak | |
| else: | |
| if current_streak >= 5: | |
| long_streaks += 1 | |
| current_streak = 0 | |
| # Final streak | |
| if current_streak >= 5: | |
| long_streaks += 1 | |
| return { | |
| "max_streak": max_streak, | |
| "long_streaks": long_streaks, | |
| } | |
| def save_gameplay_debug( | |
| frame: np.ndarray, | |
| playclock_coords: Tuple[int, int, int, int], | |
| scorebug_coords: Tuple[int, int, int, int], | |
| result, | |
| timestamp: float, | |
| output_path: Path, | |
| scale_factor: int = 4, | |
| ) -> None: | |
| """Save a debug image for gameplay analysis.""" | |
| pc_x, pc_y, pc_w, pc_h = playclock_coords | |
| sb_x, sb_y, sb_w, sb_h = scorebug_coords | |
| # Extract the scorebug area plus some context | |
| margin = 20 | |
| crop_y1 = max(0, sb_y - margin) | |
| crop_y2 = min(frame.shape[0], sb_y + sb_h + margin) | |
| crop_x1 = max(0, sb_x - margin) | |
| crop_x2 = min(frame.shape[1], sb_x + sb_w + margin) | |
| scorebug_crop = frame[crop_y1:crop_y2, crop_x1:crop_x2].copy() | |
| # Draw play clock region on crop (adjust coordinates) | |
| pc_rel_x = pc_x - crop_x1 | |
| pc_rel_y = pc_y - crop_y1 | |
| cv2.rectangle(scorebug_crop, (pc_rel_x, pc_rel_y), (pc_rel_x + pc_w, pc_rel_y + pc_h), (0, 0, 255), 2) | |
| # Extract play clock region for detail view | |
| playclock_region = frame[pc_y : pc_y + pc_h, pc_x : pc_x + pc_w].copy() | |
| pc_scaled = cv2.resize(playclock_region, None, fx=8, fy=8, interpolation=cv2.INTER_NEAREST) | |
| # Preprocess for visualization | |
| preprocessed = preprocess_playclock_region(playclock_region, scale_factor) | |
| preprocessed_bgr = cv2.cvtColor(preprocessed, cv2.COLOR_GRAY2BGR) | |
| preprocessed_scaled = cv2.resize(preprocessed_bgr, (pc_scaled.shape[1], pc_scaled.shape[0]), interpolation=cv2.INTER_NEAREST) | |
| # Scale up scorebug crop | |
| scorebug_scaled = cv2.resize(scorebug_crop, None, fx=2, fy=2) | |
| # Create composite | |
| composite_height = max(scorebug_scaled.shape[0], pc_scaled.shape[0] * 2 + 10) | |
| composite_width = scorebug_scaled.shape[1] + max(pc_scaled.shape[1], preprocessed_scaled.shape[1]) + 30 | |
| composite = np.zeros((composite_height + 40, composite_width, 3), dtype=np.uint8) | |
| # Place scorebug | |
| composite[0 : scorebug_scaled.shape[0], 0 : scorebug_scaled.shape[1]] = scorebug_scaled | |
| # Place original playclock (scaled) | |
| x_offset = scorebug_scaled.shape[1] + 20 | |
| composite[0 : pc_scaled.shape[0], x_offset : x_offset + pc_scaled.shape[1]] = pc_scaled | |
| # Place preprocessed below | |
| y_offset = pc_scaled.shape[0] + 10 | |
| composite[y_offset : y_offset + preprocessed_scaled.shape[0], x_offset : x_offset + preprocessed_scaled.shape[1]] = preprocessed_scaled | |
| # Add text | |
| font = cv2.FONT_HERSHEY_SIMPLEX | |
| if result.detected: | |
| text = f"t={seconds_to_timestamp(timestamp)} | Clock: {result.value} | Conf: {result.confidence:.2f}" | |
| color = (0, 255, 0) | |
| else: | |
| text = f"t={seconds_to_timestamp(timestamp)} | NOT DETECTED | Conf: {result.confidence:.2f}" | |
| color = (0, 0, 255) | |
| cv2.putText(composite, text, (10, composite_height + 25), font, 0.5, color, 1) | |
| cv2.imwrite(str(output_path), composite) | |
| def main(): | |
| """Main function to analyze Oregon gameplay.""" | |
| # Load Oregon config | |
| config = load_oregon_config() | |
| video_path = config["video_path"] | |
| # Calculate absolute play clock coordinates | |
| playclock_coords = ( | |
| config["scorebug_x"] + config["playclock_x_offset"], | |
| config["scorebug_y"] + config["playclock_y_offset"], | |
| config["playclock_width"], | |
| config["playclock_height"], | |
| ) | |
| scorebug_coords = ( | |
| config["scorebug_x"], | |
| config["scorebug_y"], | |
| config["scorebug_width"], | |
| config["scorebug_height"], | |
| ) | |
| # Analyze first gameplay segment (around first detected play at 333.3s) | |
| logger.info("Analyzing first gameplay segment...") | |
| results1 = diagnose_gameplay_segment( | |
| video_path=video_path, | |
| playclock_coords=playclock_coords, | |
| scorebug_coords=scorebug_coords, | |
| template_dir="output/debug/digit_templates", | |
| output_dir="output/debug/oregon_gameplay_analysis", | |
| start_time=330.0, | |
| duration=180.0, # 3 minutes | |
| ) | |
| # Compare: Analyze a later segment | |
| logger.info("") | |
| logger.info("=" * 60) | |
| logger.info("Analyzing later gameplay segment for comparison...") | |
| logger.info("=" * 60) | |
| results2 = diagnose_gameplay_segment( | |
| video_path=video_path, | |
| playclock_coords=playclock_coords, | |
| scorebug_coords=scorebug_coords, | |
| template_dir="output/debug/digit_templates", | |
| output_dir="output/debug/oregon_gameplay_later", | |
| start_time=1200.0, # 20 minutes in | |
| duration=180.0, # 3 minutes | |
| ) | |
| # Summary comparison | |
| logger.info("") | |
| logger.info("=" * 60) | |
| logger.info("COMPARISON SUMMARY") | |
| logger.info("=" * 60) | |
| logger.info("Early game (5:30-8:30): %.1f%% detection rate", results1["stats"]["detection_rate"]) | |
| logger.info("Later game (20:00-23:00): %.1f%% detection rate", results2["stats"]["detection_rate"]) | |
| if __name__ == "__main__": | |
| main() | |