Spaces:
Sleeping
Sleeping
| """ | |
| Diagnose play clock reading issues on the OSU vs Oregon video. | |
| This script: | |
| 1. Loads the saved config for Oregon | |
| 2. Extracts frames at various timestamps | |
| 3. Tests play clock reading at each frame | |
| 4. Saves debug images showing the extracted regions and preprocessing results | |
| 5. Reports the distribution of play clock readings | |
| Usage: | |
| python scripts/diagnose_oregon_playclock.py | |
| """ | |
| import json | |
| import logging | |
| import sys | |
| from collections import Counter | |
| from pathlib import Path | |
| from typing import Any, Dict, List, Optional, Tuple | |
| import cv2 | |
| import numpy as np | |
| # Add src to path | |
| sys.path.insert(0, str(Path(__file__).parent.parent / "src")) | |
| from readers import ReadPlayClock | |
| from setup import DigitTemplateLibrary | |
| from utils.regions import preprocess_playclock_region | |
| logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s") | |
| logger = logging.getLogger(__name__) | |
| def seconds_to_timestamp(seconds: float) -> str: | |
| """Convert seconds to timestamp string (H:MM:SS).""" | |
| hours = int(seconds // 3600) | |
| minutes = int((seconds % 3600) // 60) | |
| secs = int(seconds % 60) | |
| if hours > 0: | |
| return f"{hours}:{minutes:02d}:{secs:02d}" | |
| return f"{minutes}:{secs:02d}" | |
| def load_oregon_config() -> Dict[str, Any]: | |
| """Load the saved config for Oregon video.""" | |
| config_path = Path("output/OSU_vs_Oregon_01_01_25_config.json") | |
| if not config_path.exists(): | |
| raise FileNotFoundError(f"Oregon config not found: {config_path}") | |
| with open(config_path, "r") as f: | |
| return json.load(f) | |
| def diagnose_playclock_readings( | |
| video_path: str, | |
| playclock_coords: Tuple[int, int, int, int], | |
| scorebug_coords: Tuple[int, int, int, int], | |
| template_dir: str, | |
| output_dir: str, | |
| sample_timestamps: Optional[List[float]] = None, | |
| sample_interval: float = 0.5, | |
| max_samples: int = 1000, | |
| ) -> Dict[str, Any]: | |
| """ | |
| Diagnose play clock reading issues by sampling frames and analyzing results. | |
| Args: | |
| video_path: Path to video file | |
| playclock_coords: (x, y, width, height) absolute coordinates | |
| scorebug_coords: (x, y, width, height) scorebug coordinates | |
| template_dir: Path to digit templates | |
| output_dir: Directory to save debug images | |
| sample_timestamps: Optional specific timestamps to sample | |
| sample_interval: Interval between samples when not using specific timestamps | |
| max_samples: Maximum number of samples to collect | |
| Returns: | |
| Dictionary with diagnostic results | |
| """ | |
| # Create output directory | |
| output_path = Path(output_dir) | |
| output_path.mkdir(parents=True, exist_ok=True) | |
| # Load template library | |
| logger.info("Loading template library from %s", template_dir) | |
| library = DigitTemplateLibrary() | |
| if not library.load(template_dir): | |
| raise RuntimeError(f"Failed to load templates from {template_dir}") | |
| coverage = library.get_coverage_status() | |
| logger.info("Template coverage: %d/%d (complete: %s)", coverage["total_have"], coverage["total_needed"], coverage["is_complete"]) | |
| # Log available templates | |
| tens_templates = library.get_all_templates(is_tens=True) | |
| ones_templates = library.get_all_templates(is_tens=False) | |
| all_templates = tens_templates + ones_templates | |
| logger.info("Available templates (%d total):", len(all_templates)) | |
| for t in all_templates: | |
| logger.info(" - %s digit %d (position=%s, samples=%d)", "Tens" if t.is_tens_digit else "Ones", t.digit_value, t.position, t.sample_count) | |
| # Create play clock reader | |
| pc_w, pc_h = playclock_coords[2], playclock_coords[3] | |
| reader = ReadPlayClock(library, region_width=pc_w, region_height=pc_h) | |
| # Open video | |
| cap = cv2.VideoCapture(video_path) | |
| if not cap.isOpened(): | |
| raise RuntimeError(f"Failed to open video: {video_path}") | |
| fps = cap.get(cv2.CAP_PROP_FPS) | |
| total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) | |
| duration = total_frames / fps | |
| logger.info("Video: %s", video_path) | |
| logger.info(" FPS: %.2f, Duration: %s, Total frames: %d", fps, seconds_to_timestamp(duration), total_frames) | |
| logger.info(" Play clock region: (%d, %d, %d, %d)", *playclock_coords) | |
| logger.info(" Scorebug region: (%d, %d, %d, %d)", *scorebug_coords) | |
| # Generate sample timestamps if not provided | |
| if sample_timestamps is None: | |
| sample_timestamps = [] | |
| t = 0.0 | |
| while t < duration and len(sample_timestamps) < max_samples: | |
| sample_timestamps.append(t) | |
| t += sample_interval | |
| logger.info("Sampling %d timestamps...", len(sample_timestamps)) | |
| # Collect readings | |
| readings = [] | |
| value_counts: Counter = Counter() | |
| detected_count = 0 | |
| # Save first few debug images | |
| debug_images_saved = 0 | |
| max_debug_images = 20 | |
| for i, ts in enumerate(sample_timestamps): | |
| frame_num = int(ts * fps) | |
| cap.set(cv2.CAP_PROP_POS_FRAMES, frame_num) | |
| ret, frame = cap.read() | |
| if not ret or frame is None: | |
| continue | |
| # Read play clock | |
| result = reader.read_from_fixed_location(frame, playclock_coords) | |
| reading = { | |
| "timestamp": ts, | |
| "frame_num": frame_num, | |
| "detected": result.detected, | |
| "value": result.value, | |
| "confidence": result.confidence, | |
| "method": result.method, | |
| } | |
| readings.append(reading) | |
| if result.detected: | |
| detected_count += 1 | |
| value_counts[result.value] += 1 | |
| # Save debug images for some samples | |
| if debug_images_saved < max_debug_images and (i % 20 == 0 or result.detected): | |
| save_debug_image(frame, playclock_coords, scorebug_coords, result, output_path / f"debug_{i:04d}_t{int(ts)}.png", reader.scale_factor) | |
| debug_images_saved += 1 | |
| # Progress | |
| if (i + 1) % 200 == 0: | |
| pct = detected_count / (i + 1) * 100 | |
| logger.info(" Processed %d/%d samples, detection rate: %.1f%%", i + 1, len(sample_timestamps), pct) | |
| cap.release() | |
| # Compute statistics | |
| detection_rate = detected_count / len(readings) * 100 if readings else 0 | |
| logger.info("") | |
| logger.info("=" * 60) | |
| logger.info("DIAGNOSIS RESULTS") | |
| logger.info("=" * 60) | |
| logger.info("Detection rate: %d/%d (%.1f%%)", detected_count, len(readings), detection_rate) | |
| logger.info("") | |
| logger.info("Value distribution:") | |
| for value, count in sorted(value_counts.items()): | |
| logger.info(" Clock %d: %d readings (%.1f%%)", value, count, count / detected_count * 100 if detected_count else 0) | |
| # Identify gaps - consecutive non-detections | |
| gaps = [] | |
| gap_start = None | |
| for r in readings: | |
| if not r["detected"]: | |
| if gap_start is None: | |
| gap_start = r["timestamp"] | |
| else: | |
| if gap_start is not None: | |
| gap_end = r["timestamp"] | |
| gap_duration = gap_end - gap_start | |
| if gap_duration >= 5.0: # Report gaps >= 5 seconds | |
| gaps.append({"start": gap_start, "end": gap_end, "duration": gap_duration}) | |
| gap_start = None | |
| logger.info("") | |
| logger.info("Detection gaps (>= 5s):") | |
| for gap in gaps[:20]: # First 20 gaps | |
| logger.info(" %s - %s (%.1fs)", seconds_to_timestamp(gap["start"]), seconds_to_timestamp(gap["end"]), gap["duration"]) | |
| if len(gaps) > 20: | |
| logger.info(" ... and %d more gaps", len(gaps) - 20) | |
| # Save results | |
| results = { | |
| "video": video_path, | |
| "playclock_coords": list(playclock_coords), | |
| "scorebug_coords": list(scorebug_coords), | |
| "template_coverage": coverage, | |
| "stats": { | |
| "total_samples": len(readings), | |
| "detected": detected_count, | |
| "detection_rate": detection_rate, | |
| }, | |
| "value_distribution": dict(value_counts), | |
| "gaps": gaps, | |
| "readings": readings, | |
| } | |
| results_path = output_path / "diagnosis_results.json" | |
| with open(results_path, "w") as f: | |
| json.dump(results, f, indent=2) | |
| logger.info("") | |
| logger.info("Results saved to %s", results_path) | |
| logger.info("Debug images saved to %s", output_path) | |
| return results | |
| def save_debug_image( | |
| frame: np.ndarray, | |
| playclock_coords: Tuple[int, int, int, int], | |
| scorebug_coords: Tuple[int, int, int, int], | |
| result, | |
| output_path: Path, | |
| scale_factor: int = 4, | |
| ) -> None: | |
| """Save a debug image showing the frame, regions, and preprocessing result.""" | |
| pc_x, pc_y, pc_w, pc_h = playclock_coords | |
| sb_x, sb_y, sb_w, sb_h = scorebug_coords | |
| # Create annotated frame (scaled down for reasonable size) | |
| frame_scale = 0.5 | |
| frame_small = cv2.resize(frame, None, fx=frame_scale, fy=frame_scale) | |
| # Draw scorebug region (green) | |
| sb_rect = (int(sb_x * frame_scale), int(sb_y * frame_scale), int((sb_x + sb_w) * frame_scale), int((sb_y + sb_h) * frame_scale)) | |
| cv2.rectangle(frame_small, (sb_rect[0], sb_rect[1]), (sb_rect[2], sb_rect[3]), (0, 255, 0), 2) | |
| # Draw play clock region (red) | |
| pc_rect = (int(pc_x * frame_scale), int(pc_y * frame_scale), int((pc_x + pc_w) * frame_scale), int((pc_y + pc_h) * frame_scale)) | |
| cv2.rectangle(frame_small, (pc_rect[0], pc_rect[1]), (pc_rect[2], pc_rect[3]), (0, 0, 255), 2) | |
| # Extract play clock region at original scale | |
| playclock_region = frame[pc_y : pc_y + pc_h, pc_x : pc_x + pc_w].copy() | |
| # Preprocess for visualization | |
| preprocessed = preprocess_playclock_region(playclock_region, scale_factor) | |
| preprocessed_bgr = cv2.cvtColor(preprocessed, cv2.COLOR_GRAY2BGR) | |
| # Scale up the regions for visibility | |
| pc_scaled = cv2.resize(playclock_region, None, fx=4, fy=4, interpolation=cv2.INTER_NEAREST) | |
| # Create composite image | |
| # Layout: [frame_small] [playclock_region_scaled] [preprocessed] | |
| pc_h_scaled = pc_scaled.shape[0] | |
| pc_w_scaled = pc_scaled.shape[1] | |
| # Ensure preprocessed matches playclock scaled height | |
| prep_h, prep_w = preprocessed_bgr.shape[:2] | |
| if prep_h != pc_h_scaled: | |
| preprocessed_bgr = cv2.resize(preprocessed_bgr, (prep_w, pc_h_scaled)) | |
| # Add labels | |
| font = cv2.FONT_HERSHEY_SIMPLEX | |
| text_y = pc_h_scaled + 20 | |
| # Create panel for play clock and preprocessed | |
| panel_height = max(pc_h_scaled + 30, 100) | |
| panel_width = pc_w_scaled + preprocessed_bgr.shape[1] + 20 | |
| panel = np.zeros((panel_height, panel_width, 3), dtype=np.uint8) | |
| # Place regions on panel | |
| panel[0:pc_h_scaled, 0:pc_w_scaled] = pc_scaled | |
| panel[0 : preprocessed_bgr.shape[0], pc_w_scaled + 10 : pc_w_scaled + 10 + preprocessed_bgr.shape[1]] = preprocessed_bgr | |
| # Add detection result text | |
| if result.detected: | |
| text = f"Detected: {result.value} (conf: {result.confidence:.2f})" | |
| color = (0, 255, 0) # Green | |
| else: | |
| text = f"NOT detected (conf: {result.confidence:.2f})" | |
| color = (0, 0, 255) # Red | |
| cv2.putText(panel, text, (5, panel_height - 5), font, 0.4, color, 1) | |
| # Combine frame and panel | |
| # Place panel below frame | |
| frame_h, frame_w = frame_small.shape[:2] | |
| composite_h = frame_h + panel_height + 10 | |
| composite_w = max(frame_w, panel_width) | |
| composite = np.zeros((composite_h, composite_w, 3), dtype=np.uint8) | |
| composite[0:frame_h, 0:frame_w] = frame_small | |
| composite[frame_h + 10 : frame_h + 10 + panel_height, 0:panel_width] = panel | |
| cv2.imwrite(str(output_path), composite) | |
| def main(): | |
| """Main function to diagnose Oregon play clock readings.""" | |
| # Load Oregon config | |
| config = load_oregon_config() | |
| video_path = config["video_path"] | |
| # Calculate absolute play clock coordinates | |
| playclock_coords = ( | |
| config["scorebug_x"] + config["playclock_x_offset"], | |
| config["scorebug_y"] + config["playclock_y_offset"], | |
| config["playclock_width"], | |
| config["playclock_height"], | |
| ) | |
| scorebug_coords = ( | |
| config["scorebug_x"], | |
| config["scorebug_y"], | |
| config["scorebug_width"], | |
| config["scorebug_height"], | |
| ) | |
| logger.info("Oregon Video Play Clock Diagnosis") | |
| logger.info("=" * 60) | |
| logger.info("Video: %s", video_path) | |
| logger.info("Scorebug: (%d, %d, %d, %d)", *scorebug_coords) | |
| logger.info("Play clock offset: (%d, %d)", config["playclock_x_offset"], config["playclock_y_offset"]) | |
| logger.info("Play clock absolute: (%d, %d, %d, %d)", *playclock_coords) | |
| # Run diagnosis | |
| results = diagnose_playclock_readings( | |
| video_path=video_path, | |
| playclock_coords=playclock_coords, | |
| scorebug_coords=scorebug_coords, | |
| template_dir="output/debug/digit_templates", | |
| output_dir="output/debug/oregon_playclock_diagnosis", | |
| sample_interval=0.5, | |
| max_samples=2000, # First ~16 minutes | |
| ) | |
| # Summary | |
| logger.info("") | |
| logger.info("SUMMARY") | |
| logger.info("=" * 60) | |
| logger.info("Detection rate: %.1f%% (%d/%d)", results["stats"]["detection_rate"], results["stats"]["detected"], results["stats"]["total_samples"]) | |
| if results["stats"]["detection_rate"] < 20: | |
| logger.warning("") | |
| logger.warning("WARNING: Very low detection rate (<20%)") | |
| logger.warning("Possible causes:") | |
| logger.warning(" 1. Play clock region coordinates are incorrect") | |
| logger.warning(" 2. Digit templates don't match this video's font/style") | |
| logger.warning(" 3. Preprocessing (color normalization) is failing") | |
| logger.warning("") | |
| logger.warning("Check the debug images in output/debug/oregon_playclock_diagnosis/") | |
| if __name__ == "__main__": | |
| main() | |