Spaces:
Sleeping
Sleeping
| """ | |
| Normal play tracker module. | |
| Handles standard play detection using clock reset to 40 and countdown confirmation. | |
| When a 40→25 transition is detected, signals for handoff to SpecialPlayTracker. | |
| Responsibilities: | |
| - PRE_SNAP: Watch for clock reset to 40 (normal play start) | |
| - PLAY_IN_PROGRESS: Track countdown, detect play end via backward calculation | |
| - Detect 40→25 transitions and signal for handoff to SpecialPlayTracker | |
| """ | |
| import logging | |
| from typing import Optional | |
| from utils import log_play_complete | |
| from .models import ( | |
| FlagInfo, | |
| NormalTrackerState, | |
| PlayEvent, | |
| PlayState, | |
| SpecialPlayHandoff, | |
| TimeoutInfo, | |
| TrackPlayStateConfig, | |
| ) | |
| logger = logging.getLogger(__name__) | |
| class NormalPlayTracker: | |
| """ | |
| Tracks normal plays using clock reset detection and countdown confirmation. | |
| Identification Strategy: | |
| - Play START: Clock resets to 40 (from lower value) | |
| - Play END: Backward calculation from consecutive descending clock ticks | |
| When a 40→25 transition is detected (either in PRE_SNAP or during play), | |
| sets request_special_handoff=True and populates pending_handoff with context | |
| for the SpecialPlayTracker to take over. | |
| """ | |
| def __init__(self, config: Optional[TrackPlayStateConfig] = None): | |
| """ | |
| Initialize the normal play tracker. | |
| Args: | |
| config: Configuration settings. Uses defaults if not provided. | |
| """ | |
| self.config = config or TrackPlayStateConfig() | |
| self._state = NormalTrackerState() | |
| self._play_count = 0 # Running count for play numbering | |
| self._pending_kickoff_play: Optional[PlayEvent] = None # For returning kickoff play from _start_play | |
| # ========================================================================= | |
| # Public properties | |
| # ========================================================================= | |
| def state(self) -> PlayState: | |
| """Current state of the tracker.""" | |
| return self._state.state | |
| def request_special_handoff(self) -> bool: | |
| """Whether a handoff to SpecialPlayTracker is requested.""" | |
| return self._state.request_special_handoff | |
| def pending_handoff(self) -> Optional[SpecialPlayHandoff]: | |
| """Handoff data for SpecialPlayTracker (if handoff requested).""" | |
| return self._state.pending_handoff | |
| # ========================================================================= | |
| # Main update method | |
| # ========================================================================= | |
| def update( | |
| self, | |
| timestamp: float, | |
| scorebug_detected: bool, | |
| clock_value: Optional[int], | |
| timeout_info: Optional[TimeoutInfo] = None, | |
| flag_info: Optional[FlagInfo] = None, # pylint: disable=unused-argument | |
| ) -> Optional[PlayEvent]: | |
| """ | |
| Update the tracker with new frame data. | |
| Args: | |
| timestamp: Current video timestamp in seconds | |
| scorebug_detected: Whether scorebug is visible | |
| clock_value: Play clock value (None if not detected) | |
| timeout_info: Optional timeout indicator information | |
| flag_info: Optional FLAG indicator information (kept for backward compatibility) | |
| Returns: | |
| PlayEvent if a play just ended, None otherwise | |
| """ | |
| # Update timeout tracking if provided (only store high-confidence readings) | |
| if timeout_info is not None and timeout_info.confidence >= 0.5: | |
| if timeout_info.home_timeouts is not None: | |
| self._state.last_home_timeouts = timeout_info.home_timeouts | |
| if timeout_info.away_timeouts is not None: | |
| self._state.last_away_timeouts = timeout_info.away_timeouts | |
| self._state.last_timeout_confidence = timeout_info.confidence | |
| # Note: FLAG tracking is now handled by FlagTracker (independent of NormalPlayTracker) | |
| # The flag_info parameter is kept for backward compatibility but is not used here | |
| # Handle scorebug presence/absence | |
| if not scorebug_detected: | |
| return self._handle_no_scorebug(timestamp) | |
| # Update last scorebug timestamp | |
| self._state.last_scorebug_timestamp = timestamp | |
| # Handle invalid clock reading | |
| if clock_value is None: | |
| self._handle_invalid_clock(timestamp) | |
| return None | |
| # Process valid clock reading | |
| return self._process_clock_value(timestamp, clock_value, timeout_info) | |
| def _process_clock_value(self, timestamp: float, clock_value: int, timeout_info: Optional[TimeoutInfo] = None) -> Optional[PlayEvent]: | |
| """Process a valid clock reading and update state.""" | |
| completed_play = None | |
| if self._state.state == PlayState.IDLE: | |
| # First clock reading - track consecutive readings before confirming kickoff | |
| # This filters out isolated clock readings during pre-game content | |
| self._state.state = PlayState.PRE_SNAP | |
| self._state.last_clock_value = clock_value | |
| self._state.last_clock_timestamp = timestamp | |
| self._state.clock_stable_count = 1 | |
| # Track consecutive readings for opening kickoff detection | |
| if not self._state.opening_kickoff_complete: | |
| self._track_opening_kickoff_reading(timestamp, clock_value) | |
| elif self._state.state == PlayState.PRE_SNAP: | |
| # Watching for play to start (may return completed opening kickoff play) | |
| completed_play = self._handle_pre_snap(timestamp, clock_value, timeout_info) | |
| elif self._state.state == PlayState.PLAY_IN_PROGRESS: | |
| # Play is live, watching for it to end | |
| completed_play = self._handle_play_in_progress(timestamp, clock_value, timeout_info) | |
| elif self._state.state == PlayState.POST_PLAY: | |
| # Play ended, transitioning back | |
| self._handle_post_play(timestamp, clock_value) | |
| elif self._state.state == PlayState.NO_SCOREBUG: | |
| # Scorebug returned after being lost | |
| completed_play = self._handle_scorebug_returned(timestamp, clock_value) | |
| # Update tracking | |
| self._state.last_clock_value = clock_value | |
| self._state.last_clock_timestamp = timestamp | |
| return completed_play | |
| # ========================================================================= | |
| # State handlers | |
| # ========================================================================= | |
| def _handle_no_scorebug(self, timestamp: float) -> Optional[PlayEvent]: | |
| """Handle case when scorebug is not visible.""" | |
| if self._state.state == PlayState.IDLE: | |
| return None | |
| # Reset consecutive reading count for opening kickoff detection | |
| # No scorebug means no clock reading - breaks the chain | |
| if not self._state.opening_kickoff_complete and not self._state.opening_kickoff_active: | |
| if self._state.opening_kickoff_consecutive_readings > 0: | |
| logger.debug("Opening kickoff: consecutive clock readings reset (no scorebug at %.1fs)", timestamp) | |
| self._state.opening_kickoff_consecutive_readings = 0 | |
| self._state.opening_kickoff_candidate_timestamp = None | |
| # Check if we've lost scorebug for too long | |
| if self._state.last_scorebug_timestamp is not None: | |
| time_since_scorebug = timestamp - self._state.last_scorebug_timestamp | |
| if time_since_scorebug > self.config.scorebug_lost_timeout: | |
| logger.warning("Scorebug lost for %.1fs, resetting to IDLE", time_since_scorebug) | |
| self._reset_state() | |
| return None | |
| # If we were in PLAY_IN_PROGRESS with significant time at 40, record the play | |
| if self._state.state == PlayState.PLAY_IN_PROGRESS and self._state.first_40_timestamp is not None: | |
| time_at_40 = (self._state.last_scorebug_timestamp - self._state.first_40_timestamp) if self._state.last_scorebug_timestamp else 0 | |
| min_time_for_play = 2.0 | |
| if time_at_40 > min_time_for_play: | |
| play_end_time = self._state.last_scorebug_timestamp if self._state.last_scorebug_timestamp else timestamp | |
| logger.info( | |
| "Scorebug disappeared during play at %.1fs (%.1fs at 40). Recording play end at %.1fs.", | |
| timestamp, | |
| time_at_40, | |
| play_end_time, | |
| ) | |
| completed_play = self._end_play_with_backward_calc(timestamp, 40, play_end_time) | |
| self._state.state = PlayState.NO_SCOREBUG | |
| return completed_play | |
| # Transition to NO_SCOREBUG state | |
| if self._state.state in (PlayState.PRE_SNAP, PlayState.PLAY_IN_PROGRESS, PlayState.POST_PLAY): | |
| logger.debug("Scorebug lost at %.1fs, entering NO_SCOREBUG state", timestamp) | |
| self._state.state = PlayState.NO_SCOREBUG | |
| return None | |
| def _handle_invalid_clock(self, timestamp: float) -> None: | |
| """Handle case when clock reading is invalid but scorebug is present.""" | |
| if self._state.state == PlayState.PRE_SNAP and self._state.last_clock_value is not None: | |
| logger.debug("Clock unreadable at %.1fs in PRE_SNAP state", timestamp) | |
| # Reset consecutive reading count for opening kickoff detection | |
| # Invalid clock breaks the chain of valid readings | |
| if not self._state.opening_kickoff_complete and not self._state.opening_kickoff_active: | |
| if self._state.opening_kickoff_consecutive_readings > 0: | |
| logger.debug("Opening kickoff: consecutive clock readings reset (invalid clock at %.1fs)", timestamp) | |
| self._state.opening_kickoff_consecutive_readings = 0 | |
| self._state.opening_kickoff_candidate_timestamp = None | |
| def _handle_pre_snap(self, timestamp: float, clock_value: int, timeout_info: Optional[TimeoutInfo] = None) -> Optional[PlayEvent]: | |
| """Handle clock reading during PRE_SNAP state. May return completed opening kickoff play.""" | |
| # Track consecutive clock readings for opening kickoff detection | |
| if not self._state.opening_kickoff_complete and not self._state.opening_kickoff_active: | |
| self._track_opening_kickoff_reading(timestamp, clock_value) | |
| if self._state.last_clock_value is None: | |
| self._state.last_clock_value = clock_value | |
| self._state.clock_stable_count = 1 | |
| # Initialize freeze tracking | |
| self._state.clock_freeze_start_timestamp = timestamp | |
| self._state.clock_freeze_value = clock_value | |
| return None | |
| # Check for clock reset to 40 (normal play start) | |
| max_prev_value = 40 - self.config.min_clock_jump_for_reset | |
| if clock_value == 40 and self._state.last_clock_value <= max_prev_value: | |
| logger.info("Play START identified at %.1fs (clock reset to 40 from %d)", timestamp, self._state.last_clock_value) | |
| self._state.current_play_clock_base = 40 | |
| self._state.current_play_type = "normal" | |
| self._reset_freeze_tracking() | |
| # _start_play will end opening kickoff if active and store the play | |
| self._pending_kickoff_play = None | |
| self._start_play(timestamp, "clock_reset", self._state.last_clock_value) | |
| # Return the kickoff play if one was created | |
| return self._pending_kickoff_play | |
| # Reject suspicious clock resets (likely OCR noise) | |
| if clock_value == 40 and self._state.last_clock_value > max_prev_value: | |
| logger.debug( | |
| "Ignoring suspicious clock reset at %.1fs (40 from %d, requires prev <= %d)", | |
| timestamp, | |
| self._state.last_clock_value, | |
| max_prev_value, | |
| ) | |
| return None | |
| # Check for 40→25 transition - signal handoff to SpecialPlayTracker | |
| if clock_value == 25 and self._state.last_clock_value == 40: | |
| self._reset_freeze_tracking() | |
| self._request_special_handoff(timestamp, timeout_info, was_in_play=False) | |
| return None | |
| # Fix 1.1: Detect clock freeze → reset pattern for special plays | |
| # Pattern: clock at low value (≤24) for 2+ seconds, then jumps to 25 | |
| # This indicates special play completion (punt, FG, XP, kickoff) | |
| if clock_value == 25 and self._state.last_clock_value is not None and self._state.last_clock_value <= 24: | |
| clock_jump = 25 - self._state.last_clock_value | |
| if clock_jump >= self.config.min_clock_jump_for_reset: | |
| # Check if clock was frozen at a low value before jumping to 25 | |
| freeze_duration = self._get_freeze_duration(timestamp) | |
| min_freeze_duration = 1.5 # Require at least 1.5 seconds of freeze | |
| if freeze_duration >= min_freeze_duration: | |
| logger.info( | |
| "Special play detected at %.1fs: clock jumped from %d to 25 (frozen for %.1fs)", | |
| timestamp, | |
| self._state.last_clock_value, | |
| freeze_duration, | |
| ) | |
| self._reset_freeze_tracking() | |
| # Create a special play directly (low→25 transition) | |
| self._request_special_handoff_from_freeze(timestamp, timeout_info, freeze_duration) | |
| return None | |
| # Short freeze - might be missed 40, log but still try to detect | |
| logger.debug( | |
| "Short freeze before jump to 25 at %.1fs (from %d, freeze=%.1fs < %.1fs) - treating as special play", | |
| timestamp, | |
| self._state.last_clock_value, | |
| freeze_duration, | |
| min_freeze_duration, | |
| ) | |
| # Still treat as potential special play even with short freeze | |
| # The SpecialPlayTracker will verify via countdown check | |
| self._reset_freeze_tracking() | |
| self._request_special_handoff_from_freeze(timestamp, timeout_info, freeze_duration) | |
| return None | |
| # Special handling for opening kickoff when clock is counting down from 40 | |
| # This handles Tennessee case: scorebug appears with clock at 40, then counts down | |
| if self._state.opening_kickoff_active and self._state.last_clock_value == 40 and clock_value < 40: | |
| # Clock started counting down from 40 - this means first play is starting | |
| logger.info("Play START detected at %.1fs (countdown from 40 during opening kickoff)", timestamp) | |
| self._state.current_play_clock_base = 40 | |
| self._state.current_play_type = "normal" | |
| self._reset_freeze_tracking() | |
| self._pending_kickoff_play = None | |
| # Calculate the play start time: clock was at 40, now it's at clock_value | |
| # So the play started (40 - clock_value) seconds ago | |
| play_start_time = timestamp - (40 - clock_value) | |
| self._start_play(play_start_time, "countdown_from_40", 40) | |
| return self._pending_kickoff_play | |
| # Track clock stability and freeze duration | |
| if clock_value == self._state.last_clock_value: | |
| self._state.clock_stable_count += 1 | |
| # Keep the existing freeze tracking | |
| else: | |
| self._state.clock_stable_count = 1 | |
| # Clock value changed - update freeze tracking | |
| self._update_freeze_tracking(timestamp, clock_value) | |
| return None | |
| def _handle_play_in_progress(self, timestamp: float, clock_value: int, timeout_info: Optional[TimeoutInfo] = None) -> Optional[PlayEvent]: | |
| """Handle clock reading during PLAY_IN_PROGRESS state.""" | |
| if self._state.current_play_start_time is None: | |
| return None | |
| # Check for play duration timeout | |
| result = self._check_play_timeout(timestamp, clock_value) | |
| if result is not None: | |
| return result | |
| # Handle clock still at 40 | |
| if clock_value == 40: | |
| self._handle_clock_at_40(timestamp) | |
| return None | |
| # Check for 40→25 transition during play (possession change) | |
| if self._check_possession_change(timestamp, clock_value, timeout_info): | |
| return None | |
| # Check for abnormal clock drop | |
| result = self._check_abnormal_clock_drop(timestamp, clock_value, timeout_info) | |
| if result is not None: | |
| return result | |
| # Check for freeze→25 transition during play (punt/FG/XP completion) | |
| # This handles: clock counts down (40→39→...→2), freezes at low value, then jumps to 25 | |
| if self._check_freeze_to_25_in_play(timestamp, clock_value, timeout_info): | |
| return None | |
| # Track clock stability and freeze duration during play | |
| # This enables freeze→25 detection when clock freezes mid-countdown | |
| if clock_value == self._state.last_clock_value: | |
| self._state.clock_stable_count += 1 | |
| # Keep existing freeze timestamp - clock is frozen at this value | |
| else: | |
| self._state.clock_stable_count = 1 | |
| # Clock value changed - update freeze tracking for low values | |
| self._update_freeze_tracking(timestamp, clock_value) | |
| # Check for countdown confirmation (play end) | |
| return self._check_countdown_confirmation(timestamp, clock_value) | |
| def _handle_post_play(self, timestamp: float, _clock_value: int) -> None: | |
| """Handle clock reading during POST_PLAY state.""" | |
| logger.debug("Transitioning from POST_PLAY to PRE_SNAP at %.1fs", timestamp) | |
| self._state.state = PlayState.PRE_SNAP | |
| self._state.clock_stable_count = 1 | |
| def _handle_scorebug_returned(self, timestamp: float, clock_value: int) -> Optional[PlayEvent]: | |
| """Handle scorebug returning after being lost.""" | |
| completed_play = None | |
| if self._state.current_play_start_time is not None: | |
| calculated_end_time = timestamp - (self._state.current_play_clock_base - clock_value) | |
| logger.info( | |
| "Scorebug returned at %.1fs (clock=%d, base=%d), backward calc play end: %.1fs", | |
| timestamp, | |
| clock_value, | |
| self._state.current_play_clock_base, | |
| calculated_end_time, | |
| ) | |
| completed_play = self._end_play_with_backward_calc(timestamp, clock_value, calculated_end_time) | |
| else: | |
| logger.debug("Scorebug returned at %.1fs, no play in progress", timestamp) | |
| self._state.state = PlayState.PRE_SNAP | |
| self._state.clock_stable_count = 1 | |
| return completed_play | |
| # ========================================================================= | |
| # Play identification checks | |
| # ========================================================================= | |
| def _check_play_timeout(self, timestamp: float, clock_value: int) -> Optional[PlayEvent]: | |
| """Check if play duration has exceeded maximum allowed time.""" | |
| if self._state.current_play_start_time is None: | |
| return None | |
| play_duration = timestamp - self._state.current_play_start_time | |
| if play_duration > self.config.max_play_duration: | |
| capped_end_time = self._state.current_play_start_time + self.config.max_play_duration | |
| logger.warning( | |
| "Play duration (%.1fs) exceeded max (%.1fs), forcing end at %.1fs", | |
| play_duration, | |
| self.config.max_play_duration, | |
| capped_end_time, | |
| ) | |
| self._state.direct_end_time = capped_end_time | |
| self._state.countdown_history = [] | |
| return self._end_play_capped(capped_end_time, clock_value, "max_duration") | |
| return None | |
| def _handle_clock_at_40(self, timestamp: float) -> None: | |
| """Handle case when clock is still at 40 (waiting for countdown).""" | |
| if self._state.first_40_timestamp is None: | |
| self._state.first_40_timestamp = timestamp | |
| logger.debug( | |
| "Play in progress at %.1fs, clock still at 40 (%.1fs at 40)", | |
| timestamp, | |
| timestamp - self._state.first_40_timestamp, | |
| ) | |
| self._state.countdown_history = [] | |
| def _check_possession_change(self, timestamp: float, clock_value: int, timeout_info: Optional[TimeoutInfo] = None) -> bool: | |
| """ | |
| Check for 40→25 transition during play indicating possession change. | |
| If detected, signals handoff to SpecialPlayTracker. | |
| Returns True if transition was handled, False otherwise. | |
| """ | |
| if clock_value != 25 or self._state.first_40_timestamp is None: | |
| return False | |
| time_at_40 = timestamp - self._state.first_40_timestamp | |
| max_time_for_possession_change = 5.0 | |
| min_time_at_40 = 0.5 | |
| if min_time_at_40 <= time_at_40 <= max_time_for_possession_change and len(self._state.countdown_history) == 0: | |
| logger.info( | |
| "Mid-play 40→25 transition at %.1fs (%.1fs at 40). Signaling handoff to SpecialPlayTracker.", | |
| timestamp, | |
| time_at_40, | |
| ) | |
| self._request_special_handoff(timestamp, timeout_info, was_in_play=True, play_start_time=self._state.current_play_start_time, time_at_40=time_at_40) | |
| return True | |
| return False | |
| def _check_abnormal_clock_drop(self, timestamp: float, clock_value: int, timeout_info: Optional[TimeoutInfo] = None) -> Optional[PlayEvent]: | |
| """ | |
| Check for abnormal clock drop on first reading after 40. | |
| For 40→25 transitions (possession changes), we hand off to SpecialPlayTracker | |
| to check for timeout indicator changes. The SpecialPlayTracker will determine | |
| if this is a timeout or a regular possession change. | |
| """ | |
| if len(self._state.countdown_history) != 0: | |
| return None | |
| if self._state.current_play_clock_base == 25: | |
| return None | |
| clock_drop = 40 - clock_value | |
| max_normal_drop = 5 | |
| if clock_drop <= max_normal_drop: | |
| return None | |
| time_at_40 = (timestamp - self._state.first_40_timestamp) if self._state.first_40_timestamp else 0 | |
| min_time_for_play = 2.0 | |
| # For 40→25 transitions, hand off to SpecialPlayTracker for timeout checking | |
| # regardless of how long clock was at 40. Timeouts can be called after turnovers too. | |
| if clock_value == 25: | |
| logger.info( | |
| "40→25 transition at %.1fs (%.1fs at 40). Handing off for timeout check.", | |
| timestamp, | |
| time_at_40, | |
| ) | |
| self._request_special_handoff(timestamp, timeout_info, was_in_play=True, play_start_time=self._state.current_play_start_time, time_at_40=time_at_40) | |
| return None | |
| # For other abnormal drops (not to 25), handle as before | |
| if time_at_40 > min_time_for_play: | |
| # Play happened - likely turnover to non-25 value | |
| play_end_time = timestamp - 1.0 | |
| logger.info( | |
| "Turnover/possession change at %.1fs: 40 → %d after %.1fs at 40. Recording play end at %.1fs.", | |
| timestamp, | |
| clock_value, | |
| time_at_40, | |
| play_end_time, | |
| ) | |
| return self._end_play_with_backward_calc(timestamp, clock_value, play_end_time) | |
| # Brief time at 40 - likely timeout/reset | |
| logger.warning( | |
| "Abnormal clock drop at %.1fs: 40 → %d (drop=%d, only %.1fs at 40). Likely timeout/reset. Resetting to PRE_SNAP.", | |
| timestamp, | |
| clock_value, | |
| clock_drop, | |
| time_at_40, | |
| ) | |
| self._reset_play_tracking() | |
| self._state.state = PlayState.PRE_SNAP | |
| return None | |
| def _check_countdown_confirmation(self, timestamp: float, clock_value: int) -> Optional[PlayEvent]: | |
| """Check for countdown confirmation to identify play end.""" | |
| self._state.countdown_history.append((timestamp, clock_value)) | |
| if len(self._state.countdown_history) < self.config.required_countdown_ticks: | |
| return None | |
| recent = self._state.countdown_history[-self.config.required_countdown_ticks :] | |
| values = [v for _, v in recent] | |
| # Check if values are strictly descending | |
| for i in range(1, len(values)): | |
| if values[i] > values[i - 1]: | |
| return None | |
| # Valid countdown confirmed | |
| first_timestamp, first_value = recent[0] | |
| calculated_end_time = first_timestamp - (self._state.current_play_clock_base - first_value) | |
| logger.info( | |
| "Play END confirmed via %d-tick countdown: %.1fs (clock=%d→%d, base=%d)", | |
| self.config.required_countdown_ticks, | |
| calculated_end_time, | |
| values[0], | |
| values[-1], | |
| self._state.current_play_clock_base, | |
| ) | |
| self._state.direct_end_time = timestamp | |
| self._state.countdown_history = [] | |
| return self._end_play_with_backward_calc(timestamp, first_value, calculated_end_time) | |
| def _check_freeze_to_25_in_play(self, timestamp: float, clock_value: int, timeout_info: Optional[TimeoutInfo] = None) -> bool: | |
| """ | |
| Check for freeze→25 transition during play (punt/FG/XP completion). | |
| Pattern: Clock counts down from 40, freezes at a low value (e.g., 2, 14) for | |
| 1+ seconds, then jumps directly to 25. This indicates a special play (punt, | |
| field goal, extra point, 2pt conversion) completed and the clock reset. | |
| This complements the freeze→25 detection in _handle_waiting_for_play, but | |
| applies when we're already in PLAY_IN_PROGRESS tracking a countdown. | |
| Returns: | |
| True if freeze→25 transition detected and handoff requested | |
| """ | |
| if clock_value != 25: | |
| return False | |
| # Check if we have a valid last clock value that could indicate freeze→25 | |
| if self._state.last_clock_value is None or self._state.last_clock_value > 24: | |
| return False | |
| # Check for clock freeze before the jump to 25 | |
| # This distinguishes a real freeze→25 (punt kicked, waiting) from a misread | |
| freeze_duration = self._get_freeze_duration(timestamp) | |
| min_freeze_duration = 1.0 # Require at least 1 second of freeze during play | |
| if freeze_duration >= min_freeze_duration: | |
| logger.info( | |
| "Mid-play freeze→25 detected at %.1fs: clock jumped from %d to 25 (frozen %.1fs). " "This is likely punt/FG/XP completion. Handing off to SpecialPlayTracker.", | |
| timestamp, | |
| self._state.last_clock_value, | |
| freeze_duration, | |
| ) | |
| self._reset_freeze_tracking() | |
| # Hand off to special play tracker with play context | |
| self._request_special_handoff_from_freeze(timestamp, timeout_info, freeze_duration) | |
| return True | |
| # Also check if we had countdown history showing the clock was counting down | |
| # and then jumped to 25 without intermediate values (even short freeze) | |
| if len(self._state.countdown_history) >= 2: | |
| # We have at least 2 countdown values, check if last one was low | |
| last_countdown_value = self._state.countdown_history[-1][1] | |
| if last_countdown_value <= 15: # Clock was low in countdown history | |
| logger.info( | |
| "Mid-play low→25 jump at %.1fs: clock was at %d (countdown), jumped to 25 (freeze=%.1fs). " "Treating as special play completion.", | |
| timestamp, | |
| last_countdown_value, | |
| freeze_duration, | |
| ) | |
| self._reset_freeze_tracking() | |
| self._request_special_handoff_from_freeze(timestamp, timeout_info, freeze_duration) | |
| return True | |
| return False | |
| # ========================================================================= | |
| # Handoff management | |
| # ========================================================================= | |
| def _request_special_handoff( | |
| self, | |
| timestamp: float, | |
| timeout_info: Optional[TimeoutInfo], | |
| was_in_play: bool, | |
| play_start_time: Optional[float] = None, | |
| time_at_40: float = 0.0, | |
| ) -> None: | |
| """ | |
| Request handoff to SpecialPlayTracker for 40→25 transition. | |
| Args: | |
| timestamp: When the 40→25 transition occurred | |
| timeout_info: Current timeout indicator information | |
| was_in_play: Whether a play was in progress | |
| play_start_time: Start time of play (if was_in_play=True) | |
| time_at_40: How long clock was at 40 before transitioning | |
| """ | |
| # Use current timeout_info if valid, otherwise use last known confident values | |
| # This ensures we have the "before" timeout counts even if scorebug isn't visible at transition | |
| if timeout_info and timeout_info.confidence >= 0.5: | |
| home_at_40 = timeout_info.home_timeouts | |
| away_at_40 = timeout_info.away_timeouts | |
| conf_at_40 = timeout_info.confidence | |
| else: | |
| home_at_40 = self._state.last_home_timeouts | |
| away_at_40 = self._state.last_away_timeouts | |
| conf_at_40 = self._state.last_timeout_confidence | |
| handoff = SpecialPlayHandoff( | |
| transition_timestamp=timestamp, | |
| home_timeouts_at_40=home_at_40, | |
| away_timeouts_at_40=away_at_40, | |
| timeout_confidence_at_40=conf_at_40, | |
| was_in_play=was_in_play, | |
| play_start_time=play_start_time, | |
| time_at_40=time_at_40, | |
| ) | |
| self._state.request_special_handoff = True | |
| self._state.pending_handoff = handoff | |
| logger.debug("Requested handoff to SpecialPlayTracker at %.1fs (was_in_play=%s)", timestamp, was_in_play) | |
| def clear_handoff_request(self) -> None: | |
| """Clear the handoff request after SpecialPlayTracker takes over.""" | |
| self._state.request_special_handoff = False | |
| self._state.pending_handoff = None | |
| def resume_after_special(self, clock_value: Optional[int] = None) -> None: | |
| """ | |
| Resume normal tracking after SpecialPlayTracker completes. | |
| Args: | |
| clock_value: Last clock value seen by SpecialPlayTracker (for continuity) | |
| """ | |
| self._reset_play_tracking() | |
| self._state.state = PlayState.PRE_SNAP | |
| if clock_value is not None: | |
| self._state.last_clock_value = clock_value | |
| logger.debug("Resumed normal tracking after special play handling") | |
| # ========================================================================= | |
| # Play lifecycle | |
| # ========================================================================= | |
| def _start_play(self, timestamp: float, method: str, clock_value: Optional[int]) -> None: | |
| """Record the start of a new play.""" | |
| # If opening kickoff is still active when first play starts, end it now | |
| # This creates the kickoff play from first_clock_reading to this play's start | |
| if self._state.opening_kickoff_active: | |
| kickoff_play = self._end_opening_kickoff(timestamp) | |
| # The kickoff play needs to be returned, but _start_play is void | |
| # Store it for the caller to retrieve | |
| self._pending_kickoff_play = kickoff_play | |
| self._state.current_play_start_time = timestamp | |
| self._state.current_play_start_method = method | |
| self._state.current_play_start_clock = clock_value | |
| self._state.countdown_history = [] | |
| self._state.state = PlayState.PLAY_IN_PROGRESS | |
| logger.debug("Play started: time=%.1fs, method=%s, clock=%s", timestamp, method, clock_value) | |
| def _end_play_with_backward_calc(self, observation_time: float, clock_value: int, calculated_end_time: float) -> Optional[PlayEvent]: # pylint: disable=unused-argument | |
| """End the current play using backward calculation for end time.""" | |
| start_time = self._state.current_play_start_time or calculated_end_time | |
| # Sanity check: end time must be after start time | |
| if calculated_end_time < start_time: | |
| logger.warning( | |
| "Rejecting invalid play: end time (%.1fs) before start time (%.1fs). Resetting state.", | |
| calculated_end_time, | |
| start_time, | |
| ) | |
| self._reset_play_tracking() | |
| self._state.state = PlayState.PRE_SNAP | |
| return None | |
| # Sanity check: reasonable duration | |
| duration = calculated_end_time - start_time | |
| min_duration = 0.0 if self._state.current_play_type == "special" else 0.5 | |
| if duration < min_duration: | |
| logger.warning("Rejecting invalid play: duration (%.1fs) too short. Resetting state.", duration) | |
| self._reset_play_tracking() | |
| self._state.state = PlayState.PRE_SNAP | |
| return None | |
| # Mark opening kickoff as complete when first play finishes | |
| # This prevents detecting a "kickoff" after the game has started | |
| if not self._state.opening_kickoff_complete: | |
| logger.debug("First play completed - marking opening kickoff tracking as complete") | |
| self._state.opening_kickoff_complete = True | |
| self._state.opening_kickoff_active = False | |
| self._state.opening_kickoff_consecutive_readings = 0 | |
| self._state.opening_kickoff_candidate_timestamp = None | |
| self._play_count += 1 | |
| play = PlayEvent( | |
| play_number=self._play_count, | |
| start_time=start_time, | |
| end_time=calculated_end_time, | |
| confidence=0.9, | |
| start_method=self._state.current_play_start_method or "unknown", | |
| end_method="backward_calc", | |
| direct_end_time=self._state.direct_end_time, | |
| start_clock_value=self._state.current_play_start_clock, | |
| end_clock_value=clock_value, | |
| play_type=self._state.current_play_type, | |
| # Note: has_flag is handled by FlagTracker now, FLAG plays are tracked separately | |
| ) | |
| # Track last play end time for continuation detection | |
| self._state.last_play_end_time = calculated_end_time | |
| self._reset_play_tracking() | |
| self._state.state = PlayState.POST_PLAY | |
| log_play_complete(play, "backward_calc", logger) | |
| return play | |
| def _end_play_capped(self, capped_end_time: float, clock_value: int, method: str) -> PlayEvent: | |
| """End the current play with a capped end time.""" | |
| # Mark opening kickoff as complete when first play finishes | |
| # This prevents detecting a "kickoff" after the game has started | |
| if not self._state.opening_kickoff_complete: | |
| logger.debug("First play completed (capped) - marking opening kickoff tracking as complete") | |
| self._state.opening_kickoff_complete = True | |
| self._state.opening_kickoff_active = False | |
| self._state.opening_kickoff_consecutive_readings = 0 | |
| self._state.opening_kickoff_candidate_timestamp = None | |
| self._play_count += 1 | |
| play = PlayEvent( | |
| play_number=self._play_count, | |
| start_time=self._state.current_play_start_time or capped_end_time, | |
| end_time=capped_end_time, | |
| confidence=0.7, | |
| start_method=self._state.current_play_start_method or "unknown", | |
| end_method=method, | |
| direct_end_time=self._state.direct_end_time, | |
| start_clock_value=self._state.current_play_start_clock, | |
| end_clock_value=clock_value, | |
| play_type=self._state.current_play_type, | |
| # Note: has_flag is handled by FlagTracker now, FLAG plays are tracked separately | |
| ) | |
| # Track last play end time for continuation detection | |
| self._state.last_play_end_time = capped_end_time | |
| self._reset_play_tracking() | |
| self._state.state = PlayState.POST_PLAY | |
| log_play_complete(play, "capped", logger) | |
| return play | |
| def _reset_play_tracking(self) -> None: | |
| """Reset tracking variables for next play.""" | |
| self._state.current_play_start_time = None | |
| self._state.current_play_start_method = None | |
| self._state.current_play_start_clock = None | |
| self._state.direct_end_time = None | |
| self._state.clock_stable_count = 0 | |
| self._state.countdown_history = [] | |
| self._state.first_40_timestamp = None | |
| self._state.current_play_clock_base = 40 | |
| self._state.current_play_type = "normal" | |
| # Note: FLAG tracking is now handled by FlagTracker | |
| self._reset_freeze_tracking() | |
| # ========================================================================= | |
| # Opening kickoff handling | |
| # ========================================================================= | |
| def _track_opening_kickoff_reading(self, timestamp: float, clock_value: int) -> None: | |
| """ | |
| Track consecutive clock readings to confirm opening kickoff start. | |
| Requires k consecutive valid clock readings before starting kickoff tracking. | |
| This filters out isolated/sporadic clock readings during pre-game content. | |
| Args: | |
| timestamp: Current video timestamp | |
| clock_value: Current play clock value | |
| """ | |
| # Increment consecutive reading count | |
| self._state.opening_kickoff_consecutive_readings += 1 | |
| # Record candidate start timestamp on first reading | |
| if self._state.opening_kickoff_candidate_timestamp is None: | |
| self._state.opening_kickoff_candidate_timestamp = timestamp | |
| logger.debug("Opening kickoff: candidate start at %.1fs (clock=%d), tracking consecutive readings...", timestamp, clock_value) | |
| # Check if we have enough consecutive readings to confirm kickoff | |
| required_frames = self.config.opening_kickoff_min_consecutive_frames | |
| if self._state.opening_kickoff_consecutive_readings >= required_frames: | |
| logger.info( | |
| "Opening kickoff: %d consecutive clock readings confirmed (started at %.1fs)", | |
| self._state.opening_kickoff_consecutive_readings, | |
| self._state.opening_kickoff_candidate_timestamp, | |
| ) | |
| # Start tracking the opening kickoff using the candidate timestamp | |
| self._start_opening_kickoff(self._state.opening_kickoff_candidate_timestamp) | |
| else: | |
| logger.debug( | |
| "Opening kickoff: %d/%d consecutive readings (started at %.1fs)", | |
| self._state.opening_kickoff_consecutive_readings, | |
| required_frames, | |
| self._state.opening_kickoff_candidate_timestamp, | |
| ) | |
| def _start_opening_kickoff(self, timestamp: float) -> None: | |
| """ | |
| Start tracking the opening kickoff. | |
| Called when we get k consecutive valid clock readings (confirmed kickoff). | |
| The opening kickoff is special because the scorebug appears mid-play. | |
| Args: | |
| timestamp: When kickoff tracking should start (first of k consecutive readings) | |
| """ | |
| self._state.opening_kickoff_active = True | |
| self._state.first_clock_reading_timestamp = timestamp | |
| logger.info("Opening kickoff tracking started at %.1fs (%d consecutive clock readings)", timestamp, self.config.opening_kickoff_min_consecutive_frames) | |
| def _end_opening_kickoff(self, timestamp: float) -> Optional[PlayEvent]: | |
| """ | |
| End the opening kickoff and create the play event. | |
| Called when the first normal play starts (clock reset to 40 or countdown from 40). | |
| The kickoff spans from when we first got a clock reading until this moment. | |
| Note: Kickoff plays that are too long will be filtered out by PlayMerger | |
| using max_kickoff_duration filter. | |
| Args: | |
| timestamp: When the first normal play is starting | |
| Returns: | |
| PlayEvent for the opening kickoff | |
| """ | |
| # Kickoff starts when we first got a clock reading | |
| start_time = self._state.first_clock_reading_timestamp or timestamp | |
| # Kickoff ends just before the first normal play starts | |
| kickoff_end_time = timestamp - 0.5 | |
| # Ensure end time is after start time with reasonable duration | |
| if kickoff_end_time <= start_time: | |
| kickoff_end_time = start_time + 2.0 # Minimum 2 second duration for kickoffs | |
| self._play_count += 1 | |
| play = PlayEvent( | |
| play_number=self._play_count, | |
| start_time=start_time, | |
| end_time=kickoff_end_time, | |
| confidence=0.75, # Lower confidence since we can't verify exact boundaries | |
| start_method="opening_kickoff", | |
| end_method="first_play_start", | |
| direct_end_time=timestamp, | |
| start_clock_value=None, | |
| end_clock_value=40, | |
| play_type="kickoff", | |
| ) | |
| # Mark opening kickoff as complete | |
| self._state.opening_kickoff_active = False | |
| self._state.opening_kickoff_complete = True | |
| logger.info( | |
| "Opening kickoff (Play #%d): %.1fs - %.1fs (duration: %.1fs)", | |
| play.play_number, | |
| play.start_time, | |
| play.end_time, | |
| play.end_time - play.start_time, | |
| ) | |
| return play | |
| # ========================================================================= | |
| # Freeze tracking helpers (Fix 1.1: Clock freeze → reset detection) | |
| # ========================================================================= | |
| def _reset_freeze_tracking(self) -> None: | |
| """Reset clock freeze tracking state.""" | |
| self._state.clock_freeze_start_timestamp = None | |
| self._state.clock_freeze_value = None | |
| def _get_freeze_duration(self, current_timestamp: float) -> float: | |
| """ | |
| Get how long the clock has been frozen at the current value. | |
| Args: | |
| current_timestamp: Current video timestamp | |
| Returns: | |
| Duration in seconds the clock has been frozen, or 0 if not frozen | |
| """ | |
| if self._state.clock_freeze_start_timestamp is None: | |
| return 0.0 | |
| return current_timestamp - self._state.clock_freeze_start_timestamp | |
| def _update_freeze_tracking(self, timestamp: float, clock_value: int) -> None: | |
| """ | |
| Update freeze tracking when clock value changes. | |
| For detecting special plays, we track when the clock stays at a low value. | |
| When the clock value changes, we reset the freeze start time to now. | |
| Args: | |
| timestamp: Current video timestamp | |
| clock_value: New clock value | |
| """ | |
| # Track when clock is at a low value (≤24) - potential freeze before special play | |
| if clock_value <= 24: | |
| self._state.clock_freeze_start_timestamp = timestamp | |
| self._state.clock_freeze_value = clock_value | |
| else: | |
| # Clock is at high value (25+), reset freeze tracking | |
| self._reset_freeze_tracking() | |
| def _request_special_handoff_from_freeze( | |
| self, | |
| timestamp: float, | |
| timeout_info: Optional[TimeoutInfo], | |
| freeze_duration: float, | |
| ) -> None: | |
| """ | |
| Request handoff to SpecialPlayTracker for a clock freeze → 25 transition. | |
| This is similar to _request_special_handoff but for the case where | |
| clock jumped from a low frozen value directly to 25 (no 40 seen). | |
| Args: | |
| timestamp: When the transition to 25 occurred | |
| timeout_info: Current timeout indicator information | |
| freeze_duration: How long the clock was frozen before jumping to 25 | |
| """ | |
| # Use current timeout_info if valid, otherwise use last known confident values | |
| if timeout_info and timeout_info.confidence >= 0.5: | |
| home_at_40 = timeout_info.home_timeouts | |
| away_at_40 = timeout_info.away_timeouts | |
| conf_at_40 = timeout_info.confidence | |
| else: | |
| home_at_40 = self._state.last_home_timeouts | |
| away_at_40 = self._state.last_away_timeouts | |
| conf_at_40 = self._state.last_timeout_confidence | |
| handoff = SpecialPlayHandoff( | |
| transition_timestamp=timestamp, | |
| home_timeouts_at_40=home_at_40, | |
| away_timeouts_at_40=away_at_40, | |
| timeout_confidence_at_40=conf_at_40, | |
| was_in_play=False, # Not in a play, this is from PRE_SNAP | |
| play_start_time=None, | |
| time_at_40=0.0, # Not applicable for freeze detection | |
| from_freeze_detection=True, # Mark this as freeze detection so it's treated as special play | |
| ) | |
| self._state.request_special_handoff = True | |
| self._state.pending_handoff = handoff | |
| logger.info( | |
| "Requested handoff for clock freeze→25 at %.1fs (freeze_duration=%.1fs, from_value=%s)", | |
| timestamp, | |
| freeze_duration, | |
| self._state.clock_freeze_value, | |
| ) | |
| def _reset_state(self) -> None: | |
| """Fully reset state machine.""" | |
| self._state.state = PlayState.IDLE | |
| self._reset_play_tracking() | |
| self._state.last_clock_value = None | |
| self._state.last_clock_timestamp = None | |
| self._state.last_scorebug_timestamp = None | |
| self._reset_freeze_tracking() | |
| # Reset opening kickoff active flag (but NOT complete - that's permanent) | |
| self._state.opening_kickoff_active = False | |
| # Reset consecutive reading tracking (but NOT complete) | |
| self._state.opening_kickoff_consecutive_readings = 0 | |
| self._state.opening_kickoff_candidate_timestamp = None | |
| logger.debug("State machine reset to IDLE") | |
| # ========================================================================= | |
| # Play count management (for parent tracker coordination) | |
| # ========================================================================= | |
| def set_play_count(self, count: int) -> None: | |
| """Set the play count (used by parent tracker for coordination).""" | |
| self._play_count = count | |
| def get_play_count(self) -> int: | |
| """Get the current play count.""" | |
| return self._play_count | |