Spaces:
Sleeping
Sleeping
| """ | |
| Play extractor pipeline module. | |
| This module orchestrates the complete play extraction pipeline: | |
| 1. Video frame extraction | |
| 2. Scorebug detection | |
| 3. Play clock reading via template matching | |
| 4. Play state machine processing | |
| 5. Post-hoc clock reset identification (timeout/special plays) | |
| Performance optimizations: | |
| - Streaming processing: read frame -> process immediately (no intermediate storage) | |
| - Threaded video I/O: background thread reads frames while main thread processes | |
| - Template matching for clock reading (~34x faster than OCR) | |
| Note: OCR-based clock reading has been removed in favor of template matching. | |
| See docs/ocr_to_template_migration.md for details. | |
| """ | |
| import json | |
| import logging | |
| import time | |
| from pathlib import Path | |
| from typing import Optional, List, Dict, Any, Tuple, Union | |
| import cv2 | |
| import numpy as np | |
| from detection import DetectScoreBug, ScorebugDetection, DetectTimeouts | |
| from detection.timeouts import CalibratedTimeoutDetector | |
| from readers import FlagReader, ReadPlayClock, PlayClockReading | |
| from setup import DigitTemplateBuilder, DigitTemplateLibrary, PlayClockRegionConfig, PlayClockRegionExtractor | |
| from tracking import FlagInfo, TrackPlayState, PlayEvent, PlayMerger, TimeoutInfo, ClockResetIdentifier | |
| from utils import create_frame_result, log_flag_plays | |
| from video import ThreadedFrameReader | |
| from .models import DetectionConfig, DetectionResult, ParallelProcessingConfig, VideoContext | |
| from .parallel import process_video_parallel | |
| from .template_builder_pass import TemplateBuildingPass | |
| logger = logging.getLogger(__name__) | |
| def format_extraction_result_dict(result: DetectionResult) -> Dict[str, Any]: | |
| """ | |
| Format a DetectionResult into a dictionary for JSON serialization or API return. | |
| Args: | |
| result: DetectionResult to format | |
| Returns: | |
| Dictionary with structured result data | |
| """ | |
| return { | |
| "video": result.video, | |
| "segment": {"start": result.segment_start, "end": result.segment_end}, | |
| "processing": { | |
| "total_frames": result.total_frames_processed, | |
| "frames_with_scorebug": result.frames_with_scorebug, | |
| "frames_with_clock": result.frames_with_clock, | |
| }, | |
| "timing": result.timing, | |
| "plays": result.plays, | |
| "stats": result.stats, | |
| } | |
| class PlayExtractor: | |
| """ | |
| Main pipeline for extracting plays from video. | |
| This class orchestrates all extraction components: | |
| - DetectScoreBug: Locates scorebug in frames | |
| - ReadPlayClock: Reads play clock digits via template matching | |
| - TrackPlayState: Determines play boundaries | |
| - DetectTimeouts: Tracks timeout indicators for 3-class clock reset classification | |
| - ClockResetIdentifier: Post-hoc identification of timeout/special plays | |
| """ | |
| def __init__(self, config: DetectionConfig, timeout_tracker: Optional[Union[CalibratedTimeoutDetector, DetectTimeouts]] = None, flag_reader: Optional[FlagReader] = None): | |
| """ | |
| Initialize the play extractor pipeline. | |
| Args: | |
| config: Detection configuration | |
| timeout_tracker: Optional timeout tracker for clock reset classification | |
| flag_reader: Optional FLAG reader for penalty flag detection | |
| """ | |
| self.config = config | |
| self.timeout_tracker: Optional[Union[CalibratedTimeoutDetector, DetectTimeouts]] = timeout_tracker | |
| self.flag_reader = flag_reader | |
| # Template-based clock reading components (conditionally initialized) | |
| self.template_builder: Optional[DigitTemplateBuilder] = None | |
| self.template_library: Optional[DigitTemplateLibrary] = None | |
| self.template_reader: Optional[ReadPlayClock] = None | |
| self._validate_config() | |
| # Core components are initialized here (scorebug_detector, clock_reader, state_machine) | |
| self._initialize_components() | |
| def _validate_config(self) -> None: | |
| """Validate configuration paths exist.""" | |
| video_path = Path(self.config.video_path) | |
| if not video_path.exists(): | |
| raise FileNotFoundError(f"Video not found: {self.config.video_path}") | |
| # In fixed coordinates mode, template and clock config paths are not required | |
| # since we derive the regions from the fixed coordinates | |
| if not self.config.fixed_playclock_coords: | |
| template_path = Path(self.config.template_path) | |
| if not template_path.exists(): | |
| raise FileNotFoundError(f"Scorebug template not found: {self.config.template_path}") | |
| clock_config_path = Path(self.config.clock_region_config_path) | |
| if not clock_config_path.exists(): | |
| raise FileNotFoundError(f"Clock region config not found: {self.config.clock_region_config_path}") | |
| def _initialize_components(self) -> None: | |
| """Initialize extraction components.""" | |
| logger.info("Initializing play extractor components...") | |
| # Determine if we're using fixed coordinates mode | |
| # In this mode, we still use the same logic but with pre-set regions | |
| use_fixed_coords = self.config.fixed_playclock_coords is not None | |
| if use_fixed_coords: | |
| # Fixed coordinates mode: derive play clock offset from absolute coords | |
| logger.info("Fixed coordinates mode - regions pre-configured") | |
| # Compute play clock offset relative to scorebug from absolute coordinates | |
| assert self.config.fixed_playclock_coords is not None # Already checked above, helps mypy | |
| pc_x, pc_y, pc_w, pc_h = self.config.fixed_playclock_coords | |
| if self.config.fixed_scorebug_coords: | |
| sb_x, sb_y, _, _ = self.config.fixed_scorebug_coords | |
| x_offset = pc_x - sb_x | |
| y_offset = pc_y - sb_y | |
| else: | |
| # If no scorebug coords provided, treat play clock coords as offset from (0,0) | |
| x_offset, y_offset = pc_x, pc_y | |
| # Create a minimal PlayClockRegionConfig for the clock reader | |
| playclock_config = PlayClockRegionConfig(x_offset=x_offset, y_offset=y_offset, width=pc_w, height=pc_h, source_video="", scorebug_template="", samples_used=0) | |
| # Initialize scorebug detector with template for disappearance detection | |
| # In fixed coordinates mode, we still need template matching to detect | |
| # when scorebug disappears (e.g., during commercials or replays) | |
| self.scorebug_detector: DetectScoreBug = DetectScoreBug( | |
| template_path=self.config.template_path, | |
| fixed_region=self.config.fixed_scorebug_coords, | |
| use_split_detection=self.config.use_split_detection, | |
| ) | |
| logger.info("Scorebug detector initialized with template and fixed region: %s", self.config.fixed_scorebug_coords) | |
| # Initialize play clock region extractor with the derived config | |
| self.clock_reader: PlayClockRegionExtractor = PlayClockRegionExtractor(region_config=playclock_config) | |
| logger.info("Play clock region extractor initialized with offset=(%d, %d), size=(%d, %d)", x_offset, y_offset, pc_w, pc_h) | |
| else: | |
| # Standard mode: use template and config files | |
| self.scorebug_detector = DetectScoreBug(template_path=self.config.template_path, use_split_detection=self.config.use_split_detection) | |
| logger.info("Scorebug detector initialized (split_detection=%s)", self.config.use_split_detection) | |
| # Initialize play clock region extractor from config file | |
| self.clock_reader = PlayClockRegionExtractor(region_config_path=self.config.clock_region_config_path) | |
| logger.info("Play clock region extractor initialized") | |
| # Initialize state machine | |
| self.state_machine: TrackPlayState = TrackPlayState() | |
| logger.info("State machine initialized") | |
| # Initialize template matching components | |
| # Determine region dimensions from clock reader config | |
| if self.clock_reader and self.clock_reader.config: | |
| region_w = self.clock_reader.config.width | |
| region_h = self.clock_reader.config.height | |
| else: | |
| region_w, region_h = 50, 28 # defaults | |
| # Try to load pre-built templates if path provided | |
| if self.config.digit_template_path and Path(self.config.digit_template_path).exists(): | |
| self.template_library = DigitTemplateLibrary() | |
| if self.template_library.load(self.config.digit_template_path): | |
| logger.info("Loaded pre-built digit templates from %s", self.config.digit_template_path) | |
| self.template_reader = ReadPlayClock(self.template_library, region_w, region_h) | |
| else: | |
| self.template_library = None | |
| logger.info("Could not load templates, will build during extraction") | |
| # Initialize template builder for collection phase if no templates loaded | |
| if self.template_library is None: | |
| self.template_builder = DigitTemplateBuilder(region_w, region_h) | |
| logger.info("Template builder initialized for collection phase") | |
| def _open_video_and_get_context(self) -> Tuple[VideoContext, Dict[str, Any], Dict[str, float]]: | |
| """ | |
| Open video and initialize processing context. | |
| Returns: | |
| Tuple of (VideoContext, stats dict, timing dict) | |
| """ | |
| # Open video | |
| cap = cv2.VideoCapture(self.config.video_path) | |
| if not cap.isOpened(): | |
| raise RuntimeError(f"Could not open video: {self.config.video_path}") | |
| # Get video properties | |
| fps = cap.get(cv2.CAP_PROP_FPS) | |
| total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) | |
| duration = total_frames / fps if fps > 0 else 0 | |
| logger.info("Video: %.1fs duration, %.2f fps, %d total frames", duration, fps, total_frames) | |
| # Determine segment bounds | |
| start_time = self.config.start_time | |
| end_time = self.config.end_time if self.config.end_time else duration | |
| # Calculate frame skip for sequential reading | |
| frame_skip = int(self.config.frame_interval * fps) | |
| start_frame = int(start_time * fps) | |
| end_frame = int(end_time * fps) | |
| # Initialize stats and timing | |
| stats = {"total_frames": 0, "frames_with_scorebug": 0, "frames_with_clock": 0} | |
| timing = { | |
| "video_io": 0.0, | |
| "scorebug_detection": 0.0, | |
| "preprocessing": 0.0, | |
| "template_matching": 0.0, | |
| "template_building": 0.0, | |
| "state_machine": 0.0, | |
| } | |
| context = VideoContext( | |
| cap=cap, | |
| fps=fps, | |
| total_frames=total_frames, | |
| duration=duration, | |
| start_time=start_time, | |
| end_time=end_time, | |
| frame_skip=frame_skip, | |
| start_frame=start_frame, | |
| end_frame=end_frame, | |
| ) | |
| return context, stats, timing | |
| def _pass0_build_templates_with_real_detection(self, timing: Dict[str, float], progress_dict: Optional[Dict[str, Any]] = None) -> bool: | |
| """ | |
| Pass 0: Build digit templates by scanning video using TEMPLATE-BASED scorebug detection. | |
| Delegates to TemplateBuildingPass which handles the actual scanning and OCR. | |
| Args: | |
| timing: Timing dictionary to update | |
| progress_dict: Optional dict for progress updates | |
| Returns: | |
| True if templates were built successfully, False otherwise | |
| """ | |
| # Use the extracted TemplateBuildingPass module | |
| # Assert: template_builder is initialized when this method is called | |
| assert self.template_builder is not None | |
| template_pass = TemplateBuildingPass( | |
| config=self.config, | |
| clock_reader=self.clock_reader, | |
| template_builder=self.template_builder, | |
| progress_dict=progress_dict, | |
| ) | |
| # Run template building | |
| self.template_library, self.template_reader, build_time = template_pass.run() | |
| timing["template_building"] = build_time | |
| return self.template_library is not None | |
| def _streaming_extraction_pass(self, context: VideoContext, stats: Dict[str, Any], timing: Dict[str, float]) -> List[Dict[str, Any]]: | |
| """ | |
| Streaming extraction pass: Read frames, process immediately, no intermediate storage. | |
| This combines the old Pass 1 (frame extraction) and Pass 2 (template matching) into | |
| a single streaming pass. Each frame is: | |
| 1. Read from video (in background thread) | |
| 2. Scorebug detected/verified | |
| 3. Play clock region extracted | |
| 4. Template matched immediately | |
| 5. State machine updated | |
| Uses threaded video I/O to overlap reading with processing for better performance. | |
| Args: | |
| context: Video context with properties and capture object | |
| stats: Stats dictionary to update | |
| timing: Timing dictionary to update | |
| Returns: | |
| List of frame data dictionaries with all processing results | |
| """ | |
| logger.info("Streaming extraction pass: frame extraction + template matching...") | |
| logger.info( | |
| "Threaded reading: frame_skip=%d (%.2f fps effective), frames %d-%d", | |
| context.frame_skip, | |
| context.fps / context.frame_skip, | |
| context.start_frame, | |
| context.end_frame, | |
| ) | |
| # Start threaded frame reader | |
| frame_reader = ThreadedFrameReader(context.cap, context.start_frame, context.end_frame, context.frame_skip, queue_size=32) | |
| frame_reader.start() | |
| # Data structures for results | |
| frame_data: List[Dict[str, Any]] = [] | |
| # Flag to track if we've locked the scorebug region | |
| scorebug_region_locked = self.scorebug_detector.is_fixed_region_mode if self.scorebug_detector else False | |
| # Progress tracking | |
| progress_interval = int(30 / self.config.frame_interval) # Log every 30 seconds of video | |
| try: | |
| while True: | |
| # Get next frame from background reader | |
| result = frame_reader.get_frame(timeout=10.0) | |
| if result is None: | |
| break # End of stream | |
| current_frame, frame = result | |
| current_time = current_frame / context.fps | |
| if frame is None: | |
| logger.warning("Could not read frame %d at %.1fs", current_frame, current_time) | |
| continue | |
| stats["total_frames"] += 1 | |
| # Process frame with immediate template matching | |
| frame_result = self._process_frame_streaming(frame, current_time, timing, stats, scorebug_region_locked) | |
| # Update scorebug lock status | |
| if not scorebug_region_locked and frame_result.get("scorebug_detected"): | |
| if self.scorebug_detector.discover_and_lock_region(frame): | |
| scorebug_region_locked = True | |
| logger.info("Scorebug region locked at %s", self.scorebug_detector.fixed_region) | |
| frame_data.append(frame_result) | |
| # Progress logging | |
| if stats["total_frames"] % progress_interval == 0: | |
| progress_pct = 100 * (current_time - context.start_time) / (context.end_time - context.start_time) | |
| logger.info("Extraction progress: %.1fs / %.1fs (%.0f%%)", current_time, context.end_time, progress_pct) | |
| finally: | |
| # Stop the reader thread and get I/O timing | |
| frame_reader.stop() | |
| timing["video_io"] = frame_reader.io_time | |
| context.cap.release() | |
| logger.info( | |
| "Streaming extraction complete: %d frames processed, %d with scorebug, %d with clock", | |
| stats["total_frames"], | |
| stats["frames_with_scorebug"], | |
| stats["frames_with_clock"], | |
| ) | |
| return frame_data | |
| def _process_frame_streaming( | |
| self, | |
| frame: np.ndarray[Any, Any], | |
| current_time: float, | |
| timing: Dict[str, float], | |
| stats: Dict[str, Any], | |
| scorebug_region_locked: bool, | |
| ) -> Dict[str, Any]: | |
| """ | |
| Process a single frame with immediate template matching. | |
| This is the streaming version that processes each frame completely | |
| without storing intermediate data. | |
| Args: | |
| frame: The video frame | |
| current_time: Current timestamp | |
| timing: Timing dictionary to update | |
| stats: Stats dictionary to update | |
| scorebug_region_locked: Whether the scorebug region has been locked | |
| Returns: | |
| Frame data dictionary with all processing results | |
| """ | |
| # Detect scorebug | |
| t_start = time.perf_counter() | |
| if not scorebug_region_locked: | |
| self.scorebug_detector.discover_and_lock_region(frame) | |
| scorebug = self.scorebug_detector.detect(frame) | |
| timing["scorebug_detection"] += time.perf_counter() - t_start | |
| # Initialize frame result using shared factory | |
| frame_result = create_frame_result( | |
| timestamp=current_time, | |
| scorebug_detected=scorebug.detected, | |
| scorebug_bbox=scorebug.bbox if scorebug.detected else None, | |
| ) | |
| # Initialize timeout_info and flag_info for state machine | |
| timeout_info = None | |
| flag_info = None | |
| # Determine if scorebug is actually visible (vs just assumed present in fixed coords mode) | |
| scorebug_actually_visible = scorebug.template_matched if scorebug.template_matched is not None else scorebug.detected | |
| if scorebug.detected: | |
| stats["frames_with_scorebug"] += 1 | |
| # Read timeout indicators only when scorebug is actually visible | |
| # to avoid garbage readings during commercials/replays | |
| if self.timeout_tracker and self.timeout_tracker.is_configured() and scorebug_actually_visible: | |
| timeout_reading = self.timeout_tracker.read_timeouts(frame) | |
| frame_result["home_timeouts"] = timeout_reading.home_timeouts | |
| frame_result["away_timeouts"] = timeout_reading.away_timeouts | |
| frame_result["timeout_confidence"] = timeout_reading.confidence | |
| # Create TimeoutInfo for state machine clock reset classification | |
| timeout_info = TimeoutInfo( | |
| home_timeouts=timeout_reading.home_timeouts, | |
| away_timeouts=timeout_reading.away_timeouts, | |
| confidence=timeout_reading.confidence, | |
| ) | |
| # Read FLAG indicator if reader is configured | |
| # Only read flags when scorebug is actually visible (template_matched) | |
| # to avoid false positives during commercials/replays | |
| if self.flag_reader and scorebug_actually_visible: | |
| assert scorebug.bbox is not None # scorebug.detected implies bbox is set | |
| flag_reading = self.flag_reader.read(frame, scorebug.bbox) | |
| frame_result["flag_detected"] = flag_reading.detected | |
| frame_result["flag_yellow_ratio"] = flag_reading.yellow_ratio | |
| frame_result["flag_mean_hue"] = flag_reading.mean_hue | |
| # Create FlagInfo for state machine with actual visibility status | |
| flag_info = FlagInfo( | |
| detected=flag_reading.detected, | |
| yellow_ratio=flag_reading.yellow_ratio, | |
| mean_hue=flag_reading.mean_hue, | |
| is_valid_yellow=flag_reading.is_valid_yellow, | |
| scorebug_verified=scorebug_actually_visible, | |
| ) | |
| # Extract play clock region and run template matching immediately | |
| # Use padded region when in fixed coordinates mode for shift-invariant matching | |
| clock_result = None | |
| t_start = time.perf_counter() | |
| if self.config.fixed_playclock_coords and self.template_reader: | |
| # Fixed coordinates mode: use read_from_fixed_location with padding | |
| # Padding of 4 pixels handles small translational shifts in the broadcast | |
| timing["preprocessing"] += time.perf_counter() - t_start | |
| t_start = time.perf_counter() | |
| clock_result = self.template_reader.read_from_fixed_location(frame, self.config.fixed_playclock_coords, padding=4) | |
| timing["template_matching"] += time.perf_counter() - t_start | |
| else: | |
| # Standard mode: extract region then match | |
| assert scorebug.bbox is not None # scorebug.detected implies bbox is set | |
| play_clock_region = self.clock_reader.extract_region(frame, scorebug.bbox) | |
| timing["preprocessing"] += time.perf_counter() - t_start | |
| if play_clock_region is not None and self.template_reader: | |
| # Run template matching immediately (no intermediate storage!) | |
| t_start = time.perf_counter() | |
| clock_result = self.template_reader.read(play_clock_region) | |
| timing["template_matching"] += time.perf_counter() - t_start | |
| # Process clock result (common path for both fixed and standard modes) | |
| if clock_result is not None: | |
| frame_result["clock_detected"] = clock_result.detected | |
| frame_result["clock_value"] = clock_result.value | |
| if clock_result.detected: | |
| stats["frames_with_clock"] += 1 | |
| # Update state machine immediately with timeout and flag info | |
| t_start = time.perf_counter() | |
| clock_reading = PlayClockReading( | |
| detected=clock_result.detected, | |
| value=clock_result.value, | |
| confidence=clock_result.confidence, | |
| raw_text=f"TEMPLATE_{clock_result.value}" if clock_result.detected else "TEMPLATE_FAILED", | |
| ) | |
| self.state_machine.update(current_time, scorebug, clock_reading, timeout_info, flag_info) | |
| timing["state_machine"] += time.perf_counter() - t_start | |
| else: | |
| # No scorebug - still update state machine | |
| t_start = time.perf_counter() | |
| clock_reading = PlayClockReading(detected=False, value=None, confidence=0.0, raw_text="NO_SCOREBUG") | |
| self.state_machine.update(current_time, scorebug, clock_reading, timeout_info, flag_info) | |
| timing["state_machine"] += time.perf_counter() - t_start | |
| return frame_result | |
| def _finalize_extraction( | |
| self, | |
| context: VideoContext, | |
| stats: Dict[str, Any], | |
| timing: Dict[str, float], | |
| frame_data: List[Dict[str, Any]], | |
| ) -> DetectionResult: | |
| """ | |
| Finalize extraction: run post-hoc clock reset identification and build result. | |
| Uses ClockResetIdentifier for 3-class classification of 40→25 clock reset events: | |
| - Class A (weird_clock): 25 counts down immediately → rejected | |
| - Class B (timeout): Timeout indicator changed → tracked as timeout | |
| - Class C (special): Neither A nor B → special play (punt/FG/XP) | |
| Args: | |
| context: Video context | |
| stats: Processing stats | |
| timing: Timing breakdown | |
| frame_data: List of frame data dicts with clock values and timeout counts | |
| Returns: | |
| Final DetectionResult | |
| """ | |
| # Log timing breakdown | |
| self._log_timing_breakdown(timing) | |
| # Finalize state machine (close any active FLAG events) | |
| self.state_machine.finalize(context.end_time) | |
| # Get plays from state machine (normal 40-second plays) | |
| state_machine_plays = self.state_machine.get_plays() | |
| play_stats = self.state_machine.get_stats() | |
| # Get FLAG plays (tracked independently by FlagTracker) | |
| flag_plays = self.state_machine.get_flag_plays() | |
| logger.info("FLAG plays detected: %d", len(flag_plays)) | |
| log_flag_plays(flag_plays, logger) | |
| # Run post-hoc clock reset identification (40→25 transitions) | |
| clock_reset_identifier = ClockResetIdentifier() | |
| clock_reset_plays, clock_reset_stats = clock_reset_identifier.identify(frame_data) | |
| logger.info( | |
| "Clock reset identification: %d total, %d weird (rejected), %d timeouts, %d special plays", | |
| clock_reset_stats.get("total", 0), | |
| clock_reset_stats.get("weird_clock", 0), | |
| clock_reset_stats.get("timeout", 0), | |
| clock_reset_stats.get("special", 0), | |
| ) | |
| # Merge clock reset stats into play stats | |
| play_stats["clock_reset_events"] = clock_reset_stats | |
| # Merge state machine plays with clock reset plays AND FLAG plays using PlayMerger | |
| # Note: FLAG plays have absolute priority and are NEVER filtered | |
| merger = PlayMerger() | |
| plays = merger.merge(state_machine_plays, clock_reset_plays, flag_plays) | |
| # Recalculate stats from merged plays | |
| start_methods: Dict[str, int] = {} | |
| end_methods: Dict[str, int] = {} | |
| play_types: Dict[str, int] = {} | |
| flag_plays_count = 0 | |
| for play in plays: | |
| start_methods[play.start_method] = start_methods.get(play.start_method, 0) + 1 | |
| end_methods[play.end_method] = end_methods.get(play.end_method, 0) + 1 | |
| play_types[play.play_type] = play_types.get(play.play_type, 0) + 1 | |
| if play.play_type == "flag": | |
| flag_plays_count += 1 | |
| play_stats["total_plays"] = len(plays) | |
| play_stats["start_methods"] = start_methods | |
| play_stats["end_methods"] = end_methods | |
| play_stats["play_types"] = play_types | |
| play_stats["flag_plays"] = flag_plays_count | |
| result = DetectionResult( | |
| video=Path(self.config.video_path).name, | |
| segment_start=context.start_time, | |
| segment_end=context.end_time, | |
| total_frames_processed=stats["total_frames"], | |
| frames_with_scorebug=stats["frames_with_scorebug"], | |
| frames_with_clock=stats["frames_with_clock"], | |
| plays=[self._play_to_dict(p) for p in plays], | |
| stats=play_stats, | |
| timing=timing, | |
| ) | |
| # Log final summary | |
| logger.info("Extraction complete!") | |
| logger.info("Processed %d frames", stats["total_frames"]) | |
| logger.info("Frames with scorebug: %d (%.1f%%)", stats["frames_with_scorebug"], 100 * stats["frames_with_scorebug"] / max(1, stats["total_frames"])) | |
| logger.info("Frames with clock: %d (%.1f%%)", stats["frames_with_clock"], 100 * stats["frames_with_clock"] / max(1, stats["total_frames"])) | |
| logger.info("Plays extracted: %d", len(plays)) | |
| return result | |
| def _log_timing_breakdown(self, timing: Dict[str, float]) -> None: | |
| """Log the timing breakdown for the extraction run.""" | |
| total_time = sum(timing.values()) | |
| logger.info("=" * 50) | |
| logger.info("TIMING BREAKDOWN") | |
| logger.info("=" * 50) | |
| for section, t_duration in timing.items(): | |
| pct = 100 * t_duration / total_time if total_time > 0 else 0 | |
| logger.info(" %s: %.2fs (%.1f%%)", section, t_duration, pct) | |
| logger.info(" TOTAL: %.2fs", total_time) | |
| logger.info("=" * 50) | |
| def extract(self) -> DetectionResult: | |
| """ | |
| Run play extraction on the video segment. | |
| Uses streaming processing for optimal performance: | |
| - Pass 0 (if needed): Build digit templates using OCR on scorebug-verified frames | |
| - Streaming pass: Read frame -> extract region -> template match -> state machine update | |
| (threaded video I/O overlaps reading with processing) | |
| - Finalize: Clock reset identification and result building | |
| When fixed coordinates are provided, the scorebug detection step simply verifies | |
| the scorebug is present at the known location (faster than searching). | |
| Returns: | |
| DetectionResult with all extracted plays | |
| """ | |
| logger.info("Starting play extraction...") | |
| logger.info("Video: %s", self.config.video_path) | |
| logger.info("Segment: %.1fs to %s", self.config.start_time, self.config.end_time or "end") | |
| # Initialize timing dict early (needed for Pass 0) | |
| timing = { | |
| "video_io": 0.0, | |
| "scorebug_detection": 0.0, | |
| "preprocessing": 0.0, | |
| "template_matching": 0.0, | |
| "template_building": 0.0, | |
| "state_machine": 0.0, | |
| } | |
| # Pass 0: Build templates using REAL scorebug detection (if needed) | |
| # This scans the video looking for frames with actual scorebugs, | |
| # not just assuming the fixed region always has a scorebug. | |
| # This prevents building garbage templates from pre-game content. | |
| if not self.template_reader and self.template_builder: | |
| success = self._pass0_build_templates_with_real_detection(timing) | |
| if not success: | |
| logger.warning("Pass 0 failed to build templates, extraction may fail or be inaccurate") | |
| # Log mode info (after Pass 0 so we can show if templates were built) | |
| self._log_extraction_mode() | |
| # Initialize video and get processing context | |
| context, stats, _ = self._open_video_and_get_context() | |
| # Streaming extraction pass: read frames + template match + state machine (all in one) | |
| # Uses threaded video I/O to overlap reading with processing | |
| # Returns frame_data needed for post-hoc clock reset identification | |
| frame_data = self._streaming_extraction_pass(context, stats, timing) | |
| # Finalize: Post-hoc clock reset identification (Class A/B/C) and result building | |
| return self._finalize_extraction(context, stats, timing, frame_data) | |
| # pylint: disable=too-many-locals | |
| def extract_parallel(self, num_workers: int = 2, output_dir: Optional[Path] = None, progress_dict: Optional[Dict[str, Any]] = None) -> DetectionResult: | |
| """ | |
| Run play extraction using parallel chunk processing. | |
| This provides ~26% speedup over sequential processing by using multiple | |
| processes to read and process different segments of the video simultaneously. | |
| Process: | |
| 1. Pass 0: Build digit templates (single-threaded, required for clock reading) | |
| 2. Save templates to disk for worker processes to load | |
| 3. Parallel pass: Each worker processes a video chunk independently | |
| 4. Merge: Combine frame data from all chunks in chronological order | |
| 5. State machine: Process merged data to extract plays | |
| Args: | |
| num_workers: Number of parallel workers (default 2). | |
| output_dir: Output directory for templates (required). | |
| progress_dict: Optional dict to receive progress updates with keys: | |
| overall_pct (0-100), worker_pcts (dict), complete (bool). | |
| Returns: | |
| DetectionResult with all extracted plays | |
| """ | |
| logger.info("Starting parallel play extraction (%d workers)...", num_workers) | |
| logger.info("Video: %s", self.config.video_path) | |
| logger.info("Segment: %.1fs to %s", self.config.start_time, self.config.end_time or "end") | |
| # Initialize timing dict | |
| timing = { | |
| "video_io": 0.0, | |
| "scorebug_detection": 0.0, | |
| "preprocessing": 0.0, | |
| "template_matching": 0.0, | |
| "template_building": 0.0, | |
| "state_machine": 0.0, | |
| } | |
| # Pass 0: Build templates (required before parallel processing) | |
| if not self.template_reader and self.template_builder: | |
| success = self._pass0_build_templates_with_real_detection(timing, progress_dict=progress_dict) | |
| if not success: | |
| logger.warning("Pass 0 failed to build templates, extraction may fail or be inaccurate") | |
| # Save templates to disk for worker processes | |
| template_path = None | |
| if self.template_library and output_dir: | |
| template_path = output_dir / "debug" / "digit_templates" | |
| self.template_library.save(str(template_path)) | |
| logger.info("Templates saved to %s for parallel workers", template_path) | |
| # Get video duration for end_time if not specified | |
| cap = cv2.VideoCapture(self.config.video_path) | |
| fps = cap.get(cv2.CAP_PROP_FPS) | |
| total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) | |
| duration = total_frames / fps if fps > 0 else 0 | |
| cap.release() | |
| end_time = self.config.end_time if self.config.end_time else duration | |
| # Get timeout config path | |
| timeout_config_path = None | |
| if self.timeout_tracker and output_dir: | |
| timeout_config_path = str(output_dir / f"{Path(self.config.video_path).stem.replace(' ', '_').replace('.', '_')}_timeout_config.json") | |
| if not Path(timeout_config_path).exists(): | |
| timeout_config_path = None | |
| # Run parallel processing | |
| logger.info("Starting parallel frame extraction...") | |
| t_parallel_start = time.perf_counter() | |
| # Create parallel processing config | |
| # Asserts: validated by _validate_config, parallel mode requires fixed coords | |
| assert self.config.fixed_playclock_coords is not None | |
| assert self.config.fixed_scorebug_coords is not None | |
| # Extract FLAG region config from flag_reader if available | |
| flag_x_offset = None | |
| flag_y_offset = None | |
| flag_width = None | |
| flag_height = None | |
| if self.flag_reader: | |
| flag_x_offset = self.flag_reader.flag_x_offset | |
| flag_y_offset = self.flag_reader.flag_y_offset | |
| flag_width = self.flag_reader.flag_width | |
| flag_height = self.flag_reader.flag_height | |
| parallel_config = ParallelProcessingConfig( | |
| video_path=self.config.video_path, | |
| start_time=self.config.start_time, | |
| end_time=end_time, | |
| frame_interval=self.config.frame_interval, | |
| fixed_playclock_coords=self.config.fixed_playclock_coords, | |
| fixed_scorebug_coords=self.config.fixed_scorebug_coords, | |
| template_library_path=str(template_path) if template_path else None, | |
| timeout_config_path=timeout_config_path, | |
| scorebug_template_path=self.config.template_path, # For scorebug verification during FLAG detection | |
| flag_x_offset=flag_x_offset, | |
| flag_y_offset=flag_y_offset, | |
| flag_width=flag_width, | |
| flag_height=flag_height, | |
| ) | |
| frame_data, stats, io_time = process_video_parallel(parallel_config, num_workers=num_workers, external_progress_dict=progress_dict) | |
| timing["video_io"] = io_time | |
| # Estimate template matching time from parallel processing | |
| parallel_time = time.perf_counter() - t_parallel_start | |
| timing["template_matching"] = max(0, parallel_time - io_time - timing["template_building"]) | |
| logger.info("Parallel processing complete: %d frames", stats["total_frames"]) | |
| # Create a minimal context for finalization | |
| context = VideoContext( | |
| cap=None, | |
| fps=fps, | |
| total_frames=total_frames, | |
| duration=duration, | |
| start_time=self.config.start_time, | |
| end_time=end_time, | |
| frame_skip=int(self.config.frame_interval * fps), | |
| start_frame=int(self.config.start_time * fps), | |
| end_frame=int(end_time * fps), | |
| ) | |
| # Run state machine on merged frame data | |
| t_sm_start = time.perf_counter() | |
| for frame in frame_data: | |
| # Create proper objects for state machine | |
| # In fixed coords mode: detected=True (assumed), template_matched=actual visibility | |
| scorebug = ScorebugDetection( | |
| detected=frame.get("scorebug_detected", False), | |
| bbox=frame.get("scorebug_bbox"), | |
| confidence=1.0 if frame.get("scorebug_detected") else 0.0, | |
| template_matched=frame.get("scorebug_template_matched"), # For special play end detection | |
| ) | |
| clock_reading = PlayClockReading( | |
| detected=frame.get("clock_detected", False), | |
| value=frame.get("clock_value"), | |
| confidence=1.0 if frame.get("clock_detected") else 0.0, | |
| raw_text=f"PARALLEL_{frame.get('clock_value')}" if frame.get("clock_detected") else "PARALLEL_FAILED", | |
| ) | |
| # Create timeout info for clock reset classification | |
| timeout_info = None | |
| if frame.get("home_timeouts") is not None or frame.get("away_timeouts") is not None: | |
| timeout_info = TimeoutInfo( | |
| home_timeouts=frame.get("home_timeouts"), | |
| away_timeouts=frame.get("away_timeouts"), | |
| confidence=frame.get("timeout_confidence", 0.0), | |
| ) | |
| # Create FLAG info for penalty flag tracking | |
| # Use scorebug_template_matched (actual visibility) to filter false positives during replays/commercials | |
| flag_info = None | |
| if frame.get("flag_detected") is not None: | |
| # In fixed coords mode: scorebug_detected=True (assumed), scorebug_template_matched=actual visibility | |
| scorebug_actually_visible = frame.get("scorebug_template_matched") | |
| if scorebug_actually_visible is None: | |
| scorebug_actually_visible = frame.get("scorebug_detected", True) | |
| flag_info = FlagInfo( | |
| detected=frame.get("flag_detected", False), | |
| yellow_ratio=frame.get("flag_yellow_ratio", 0.0), | |
| mean_hue=frame.get("flag_mean_hue", 0.0), | |
| scorebug_verified=scorebug_actually_visible, | |
| ) | |
| self.state_machine.update(frame["timestamp"], scorebug, clock_reading, timeout_info, flag_info) | |
| timing["state_machine"] = time.perf_counter() - t_sm_start | |
| # Update stats dict | |
| stats_dict = { | |
| "total_frames": stats["total_frames"], | |
| "frames_with_scorebug": stats["frames_with_scorebug"], | |
| "frames_with_clock": stats["frames_with_clock"], | |
| } | |
| # Finalize: Post-hoc clock reset identification (Class A/B/C) and result building | |
| return self._finalize_extraction(context, stats_dict, timing, frame_data) | |
| def _log_extraction_mode(self) -> None: | |
| """Log the extraction mode being used.""" | |
| use_fixed_region = self.scorebug_detector and self.scorebug_detector.is_fixed_region_mode | |
| if use_fixed_region: | |
| logger.info("Mode: Fixed region (scorebug location pre-configured)") | |
| if self.scorebug_detector.fixed_region: | |
| logger.info(" Scorebug region: %s", self.scorebug_detector.fixed_region) | |
| else: | |
| logger.info("Mode: Dynamic scorebug detection (will discover and lock region)") | |
| logger.info("Clock reading: Template matching (34x faster than OCR)") | |
| if self.template_reader: | |
| logger.info(" Templates ready (built via real scorebug detection)") | |
| else: | |
| logger.info(" Will build templates using fallback method") | |
| def _play_to_dict(self, play: PlayEvent) -> Dict[str, Any]: | |
| """Convert PlayEvent to dictionary for JSON serialization.""" | |
| return { | |
| "play_number": play.play_number, | |
| "start_time": play.start_time, | |
| "end_time": play.end_time, | |
| "duration": play.end_time - play.start_time, | |
| "confidence": play.confidence, | |
| "start_method": play.start_method, | |
| "end_method": play.end_method, | |
| "direct_end_time": play.direct_end_time, | |
| "start_clock_value": play.start_clock_value, | |
| "end_clock_value": play.end_clock_value, | |
| "play_type": play.play_type, | |
| "has_flag": play.has_flag, | |
| } | |
| def save_results(self, result: DetectionResult, output_path: str) -> None: | |
| """ | |
| Save extraction results to a JSON file. | |
| Args: | |
| result: Extraction results | |
| output_path: Path to output file | |
| """ | |
| output = Path(output_path) | |
| output.parent.mkdir(parents=True, exist_ok=True) | |
| data = format_extraction_result_dict(result) | |
| # Include configuration if provided (for reproducibility) | |
| if result.config: | |
| data["config"] = result.config | |
| with open(output, "w", encoding="utf-8") as f: | |
| json.dump(data, f, indent=2) | |
| logger.info("Results saved to %s", output_path) | |