""" Clock reset identifier module for post-hoc 40→25 transition analysis. This module identifies and classifies 40→25 play clock reset events by analyzing frame data after the initial extraction pass. It complements the real-time TrackPlayState by catching timeout and special plays that the state machine may miss or classify differently. Classification (Class A/B/C): - Class A (weird_clock): 25 counts down immediately → rejected (false positive) - Class B (timeout): Timeout indicator changed → tracked as timeout play - Class C (special): Neither A nor B → special play (punt/FG/XP/injury) """ import logging from typing import Any, Dict, List, Optional, Tuple from .models import PlayEvent, determine_timeout_team logger = logging.getLogger(__name__) # pylint: disable=too-few-public-methods class ClockResetIdentifier: """ Identifies and classifies 40→25 clock reset events from frame data. This performs post-hoc analysis on extracted frame data to find timeout and special plays by looking for 40→25 clock transitions and classifying them based on subsequent behavior and timeout indicator changes. """ def __init__( self, immediate_countdown_window: float = 2.0, special_play_extension: float = 10.0, timeout_max_duration: float = 15.0, ): """ Initialize the clock reset identifier. Args: immediate_countdown_window: Seconds to check if 25 counts down (Class A filter) special_play_extension: Max duration for special plays (Class C) timeout_max_duration: Max duration for timeout plays (Class B) """ self.immediate_countdown_window = immediate_countdown_window self.special_play_extension = special_play_extension self.timeout_max_duration = timeout_max_duration def identify(self, frame_data: List[Dict[str, Any]]) -> Tuple[List[PlayEvent], Dict[str, int]]: """ Identify and classify 40→25 clock reset events in frame data. Scans through frame_data looking for 40→25 transitions and classifies each: - Class A (weird_clock): 25 counts down immediately → rejected - Class B (timeout): Timeout indicator changed → timeout play - Class C (special): Neither A nor B → special play Args: frame_data: List of frame data dicts with clock_value, timestamp, home_timeouts, away_timeouts, etc. Returns: Tuple of (list of PlayEvent for valid clock resets, stats dict) """ plays: List[PlayEvent] = [] stats = {"total": 0, "weird_clock": 0, "timeout": 0, "special": 0} prev_clock: Optional[int] = None for i, frame in enumerate(frame_data): clock_value = frame.get("clock_value") timestamp: float = frame["timestamp"] if clock_value is not None: # Identify 40 → 25 transition if prev_clock == 40 and clock_value == 25: stats["total"] += 1 # Check Class A: 25 immediately counts down (weird clock behavior) is_immediate_countdown = self._check_immediate_countdown(frame_data, i) # Check Class B: timeout indicator changed timeout_team = self._check_timeout_change(frame_data, i) if is_immediate_countdown: # Class A: Weird clock behavior - reject stats["weird_clock"] += 1 logger.debug("Clock reset at %.1fs: weird_clock (25 counts down immediately)", timestamp) elif timeout_team: # Class B: Team timeout stats["timeout"] += 1 play_end = self._find_play_end(frame_data, i, max_duration=self.timeout_max_duration) play = PlayEvent( play_number=0, start_time=timestamp, end_time=play_end, confidence=0.8, start_method=f"timeout_{timeout_team}", end_method="timeout_end", direct_end_time=play_end, start_clock_value=prev_clock, end_clock_value=25, play_type="timeout", ) plays.append(play) logger.debug("Clock reset at %.1fs: timeout (%s team)", timestamp, timeout_team) else: # Class C: Special play (punt/FG/XP/injury) stats["special"] += 1 play_end = self._find_play_end(frame_data, i, max_duration=self.special_play_extension) play_duration = play_end - timestamp end_method = "max_duration" if play_duration >= self.special_play_extension - 0.1 else "scorebug_disappeared" play = PlayEvent( play_number=0, start_time=timestamp, end_time=play_end, confidence=0.8, start_method="clock_reset_special", end_method=end_method, direct_end_time=play_end, start_clock_value=prev_clock, end_clock_value=25, play_type="special", ) plays.append(play) logger.debug("Clock reset at %.1fs: special play (%.1fs duration)", timestamp, play_end - timestamp) prev_clock = clock_value return plays, stats def _check_immediate_countdown(self, frame_data: List[Dict[str, Any]], frame_idx: int) -> bool: """ Check if 25 immediately starts counting down (Class A filter). If the clock shows a value < 25 within the countdown window after the reset, this indicates weird clock behavior (false positive). Args: frame_data: Frame data list frame_idx: Index of frame where 40→25 reset occurred Returns: True if 25 counts down immediately (Class A), False otherwise """ reset_timestamp: float = frame_data[frame_idx]["timestamp"] for j in range(frame_idx + 1, len(frame_data)): frame = frame_data[j] elapsed = frame["timestamp"] - reset_timestamp if elapsed > self.immediate_countdown_window: break clock_value = frame.get("clock_value") if clock_value is not None and clock_value < 25: return True # 25 counted down - weird clock return False # Minimum confidence threshold for reliable timeout readings MIN_TIMEOUT_CONFIDENCE = 0.5 # Delay (seconds) after 40→25 before checking timeout indicators # Must be long enough for the scorebug timeout indicator to update (typically 4-8 seconds) TIMEOUT_CHECK_DELAY = 4.0 # Maximum time (seconds) after 40→25 to look for timeout indicator changes # Extended to 10s to catch slow-updating indicators (some take 8+ seconds) TIMEOUT_CHECK_MAX_DELAY = 10.0 def _check_timeout_change(self, frame_data: List[Dict[str, Any]], frame_idx: int) -> Optional[str]: """ Check if a timeout indicator changed around the reset (Class B check). Compares timeout counts before and after the reset to determine if a team timeout was called. Validation rules for a valid timeout: 1. Exactly ONE team's count decreased by exactly 1 2. Other team's count stayed the same 3. Both before and after readings have confidence >= MIN_TIMEOUT_CONFIDENCE Args: frame_data: Frame data list frame_idx: Index of frame where 40→25 reset occurred Returns: "home" or "away" if timeout was used, None otherwise """ reset_timestamp: float = frame_data[frame_idx]["timestamp"] # Get timeout counts BEFORE reset (look back for high-confidence reading) before_home: Optional[int] = None before_away: Optional[int] = None before_conf: float = 0.0 for j in range(frame_idx - 1, max(0, frame_idx - 20), -1): frame = frame_data[j] conf = frame.get("timeout_confidence", 0.0) if frame.get("home_timeouts") is not None and conf >= self.MIN_TIMEOUT_CONFIDENCE: before_home = frame.get("home_timeouts") before_away = frame.get("away_timeouts") before_conf = conf break if before_home is None or before_away is None: return None # Look forward for timeout change AFTER DELAY (4-10 seconds after reset) target_time = reset_timestamp + self.TIMEOUT_CHECK_DELAY max_time = reset_timestamp + self.TIMEOUT_CHECK_MAX_DELAY after_home: Optional[int] = None after_away: Optional[int] = None after_conf: float = 0.0 for j in range(frame_idx + 1, len(frame_data)): frame = frame_data[j] timestamp: float = frame["timestamp"] # Only check after the delay period if timestamp < target_time: continue # Stop if we've gone past the search window if timestamp > max_time: break conf = frame.get("timeout_confidence", 0.0) if frame.get("home_timeouts") is not None and conf >= self.MIN_TIMEOUT_CONFIDENCE: after_home = frame.get("home_timeouts") after_away = frame.get("away_timeouts") after_conf = conf break if after_home is None or after_away is None: return None # Calculate changes and determine which team called timeout (if any) home_change = before_home - after_home # positive = decrease away_change = before_away - after_away # positive = decrease timeout_team = determine_timeout_team(home_change, away_change) if timeout_team: logger.debug( "Timeout detected: %s team (before=(%d,%d) conf=%.2f, after=(%d,%d) conf=%.2f)", timeout_team, before_home, before_away, before_conf, after_home, after_away, after_conf, ) return timeout_team def _find_play_end(self, frame_data: List[Dict[str, Any]], frame_idx: int, max_duration: float) -> float: """ Find the end time for a clock reset play. The play ends when EITHER: - Scorebug/clock disappears (cut to commercial/replay) - max_duration seconds have elapsed since the reset Whichever comes first. Args: frame_data: Frame data list frame_idx: Index of frame where 40→25 reset occurred max_duration: Maximum play duration from reset Returns: Play end timestamp """ start_timestamp: float = frame_data[frame_idx]["timestamp"] max_end_time = start_timestamp + max_duration # Look for scorebug disappearance (but cap at max_duration) for j in range(frame_idx + 1, len(frame_data)): frame = frame_data[j] timestamp: float = frame["timestamp"] # If we've exceeded max_duration, end at max_duration if timestamp >= max_end_time: return max_end_time # Check for clock/scorebug disappearance clock_available = frame.get("clock_detected", frame.get("scorebug_detected", False)) if not clock_available: return timestamp # Default: end at max_duration (or end of data if shorter) return min(max_end_time, float(frame_data[-1]["timestamp"]) if frame_data else max_end_time)