Spaces:
Sleeping
Sleeping
| """ | |
| Clock reset identifier module for post-hoc 40β25 transition analysis. | |
| This module identifies and classifies 40β25 play clock reset events by analyzing | |
| frame data after the initial extraction pass. It complements the real-time | |
| TrackPlayState by catching timeout and special plays that the state machine | |
| may miss or classify differently. | |
| Classification (Class A/B/C): | |
| - Class A (weird_clock): 25 counts down immediately β rejected (false positive) | |
| - Class B (timeout): Timeout indicator changed β tracked as timeout play | |
| - Class C (special): Neither A nor B β special play (punt/FG/XP/injury) | |
| """ | |
| import logging | |
| from typing import Any, Dict, List, Optional, Tuple | |
| from .models import PlayEvent, determine_timeout_team | |
| logger = logging.getLogger(__name__) | |
| # pylint: disable=too-few-public-methods | |
| class ClockResetIdentifier: | |
| """ | |
| Identifies and classifies 40β25 clock reset events from frame data. | |
| This performs post-hoc analysis on extracted frame data to find timeout | |
| and special plays by looking for 40β25 clock transitions and classifying | |
| them based on subsequent behavior and timeout indicator changes. | |
| """ | |
| def __init__( | |
| self, | |
| immediate_countdown_window: float = 2.0, | |
| special_play_extension: float = 10.0, | |
| timeout_max_duration: float = 15.0, | |
| ): | |
| """ | |
| Initialize the clock reset identifier. | |
| Args: | |
| immediate_countdown_window: Seconds to check if 25 counts down (Class A filter) | |
| special_play_extension: Max duration for special plays (Class C) | |
| timeout_max_duration: Max duration for timeout plays (Class B) | |
| """ | |
| self.immediate_countdown_window = immediate_countdown_window | |
| self.special_play_extension = special_play_extension | |
| self.timeout_max_duration = timeout_max_duration | |
| def identify(self, frame_data: List[Dict[str, Any]]) -> Tuple[List[PlayEvent], Dict[str, int]]: | |
| """ | |
| Identify and classify 40β25 clock reset events in frame data. | |
| Scans through frame_data looking for 40β25 transitions and classifies each: | |
| - Class A (weird_clock): 25 counts down immediately β rejected | |
| - Class B (timeout): Timeout indicator changed β timeout play | |
| - Class C (special): Neither A nor B β special play | |
| Args: | |
| frame_data: List of frame data dicts with clock_value, timestamp, | |
| home_timeouts, away_timeouts, etc. | |
| Returns: | |
| Tuple of (list of PlayEvent for valid clock resets, stats dict) | |
| """ | |
| plays: List[PlayEvent] = [] | |
| stats = {"total": 0, "weird_clock": 0, "timeout": 0, "special": 0} | |
| prev_clock: Optional[int] = None | |
| for i, frame in enumerate(frame_data): | |
| clock_value = frame.get("clock_value") | |
| timestamp: float = frame["timestamp"] | |
| if clock_value is not None: | |
| # Identify 40 β 25 transition | |
| if prev_clock == 40 and clock_value == 25: | |
| stats["total"] += 1 | |
| # Check Class A: 25 immediately counts down (weird clock behavior) | |
| is_immediate_countdown = self._check_immediate_countdown(frame_data, i) | |
| # Check Class B: timeout indicator changed | |
| timeout_team = self._check_timeout_change(frame_data, i) | |
| if is_immediate_countdown: | |
| # Class A: Weird clock behavior - reject | |
| stats["weird_clock"] += 1 | |
| logger.debug("Clock reset at %.1fs: weird_clock (25 counts down immediately)", timestamp) | |
| elif timeout_team: | |
| # Class B: Team timeout | |
| stats["timeout"] += 1 | |
| play_end = self._find_play_end(frame_data, i, max_duration=self.timeout_max_duration) | |
| play = PlayEvent( | |
| play_number=0, | |
| start_time=timestamp, | |
| end_time=play_end, | |
| confidence=0.8, | |
| start_method=f"timeout_{timeout_team}", | |
| end_method="timeout_end", | |
| direct_end_time=play_end, | |
| start_clock_value=prev_clock, | |
| end_clock_value=25, | |
| play_type="timeout", | |
| ) | |
| plays.append(play) | |
| logger.debug("Clock reset at %.1fs: timeout (%s team)", timestamp, timeout_team) | |
| else: | |
| # Class C: Special play (punt/FG/XP/injury) | |
| stats["special"] += 1 | |
| play_end = self._find_play_end(frame_data, i, max_duration=self.special_play_extension) | |
| play_duration = play_end - timestamp | |
| end_method = "max_duration" if play_duration >= self.special_play_extension - 0.1 else "scorebug_disappeared" | |
| play = PlayEvent( | |
| play_number=0, | |
| start_time=timestamp, | |
| end_time=play_end, | |
| confidence=0.8, | |
| start_method="clock_reset_special", | |
| end_method=end_method, | |
| direct_end_time=play_end, | |
| start_clock_value=prev_clock, | |
| end_clock_value=25, | |
| play_type="special", | |
| ) | |
| plays.append(play) | |
| logger.debug("Clock reset at %.1fs: special play (%.1fs duration)", timestamp, play_end - timestamp) | |
| prev_clock = clock_value | |
| return plays, stats | |
| def _check_immediate_countdown(self, frame_data: List[Dict[str, Any]], frame_idx: int) -> bool: | |
| """ | |
| Check if 25 immediately starts counting down (Class A filter). | |
| If the clock shows a value < 25 within the countdown window after | |
| the reset, this indicates weird clock behavior (false positive). | |
| Args: | |
| frame_data: Frame data list | |
| frame_idx: Index of frame where 40β25 reset occurred | |
| Returns: | |
| True if 25 counts down immediately (Class A), False otherwise | |
| """ | |
| reset_timestamp: float = frame_data[frame_idx]["timestamp"] | |
| for j in range(frame_idx + 1, len(frame_data)): | |
| frame = frame_data[j] | |
| elapsed = frame["timestamp"] - reset_timestamp | |
| if elapsed > self.immediate_countdown_window: | |
| break | |
| clock_value = frame.get("clock_value") | |
| if clock_value is not None and clock_value < 25: | |
| return True # 25 counted down - weird clock | |
| return False | |
| # Minimum confidence threshold for reliable timeout readings | |
| MIN_TIMEOUT_CONFIDENCE = 0.5 | |
| # Delay (seconds) after 40β25 before checking timeout indicators | |
| # Must be long enough for the scorebug timeout indicator to update (typically 4-8 seconds) | |
| TIMEOUT_CHECK_DELAY = 4.0 | |
| # Maximum time (seconds) after 40β25 to look for timeout indicator changes | |
| # Extended to 10s to catch slow-updating indicators (some take 8+ seconds) | |
| TIMEOUT_CHECK_MAX_DELAY = 10.0 | |
| def _check_timeout_change(self, frame_data: List[Dict[str, Any]], frame_idx: int) -> Optional[str]: | |
| """ | |
| Check if a timeout indicator changed around the reset (Class B check). | |
| Compares timeout counts before and after the reset to determine | |
| if a team timeout was called. | |
| Validation rules for a valid timeout: | |
| 1. Exactly ONE team's count decreased by exactly 1 | |
| 2. Other team's count stayed the same | |
| 3. Both before and after readings have confidence >= MIN_TIMEOUT_CONFIDENCE | |
| Args: | |
| frame_data: Frame data list | |
| frame_idx: Index of frame where 40β25 reset occurred | |
| Returns: | |
| "home" or "away" if timeout was used, None otherwise | |
| """ | |
| reset_timestamp: float = frame_data[frame_idx]["timestamp"] | |
| # Get timeout counts BEFORE reset (look back for high-confidence reading) | |
| before_home: Optional[int] = None | |
| before_away: Optional[int] = None | |
| before_conf: float = 0.0 | |
| for j in range(frame_idx - 1, max(0, frame_idx - 20), -1): | |
| frame = frame_data[j] | |
| conf = frame.get("timeout_confidence", 0.0) | |
| if frame.get("home_timeouts") is not None and conf >= self.MIN_TIMEOUT_CONFIDENCE: | |
| before_home = frame.get("home_timeouts") | |
| before_away = frame.get("away_timeouts") | |
| before_conf = conf | |
| break | |
| if before_home is None or before_away is None: | |
| return None | |
| # Look forward for timeout change AFTER DELAY (4-10 seconds after reset) | |
| target_time = reset_timestamp + self.TIMEOUT_CHECK_DELAY | |
| max_time = reset_timestamp + self.TIMEOUT_CHECK_MAX_DELAY | |
| after_home: Optional[int] = None | |
| after_away: Optional[int] = None | |
| after_conf: float = 0.0 | |
| for j in range(frame_idx + 1, len(frame_data)): | |
| frame = frame_data[j] | |
| timestamp: float = frame["timestamp"] | |
| # Only check after the delay period | |
| if timestamp < target_time: | |
| continue | |
| # Stop if we've gone past the search window | |
| if timestamp > max_time: | |
| break | |
| conf = frame.get("timeout_confidence", 0.0) | |
| if frame.get("home_timeouts") is not None and conf >= self.MIN_TIMEOUT_CONFIDENCE: | |
| after_home = frame.get("home_timeouts") | |
| after_away = frame.get("away_timeouts") | |
| after_conf = conf | |
| break | |
| if after_home is None or after_away is None: | |
| return None | |
| # Calculate changes and determine which team called timeout (if any) | |
| home_change = before_home - after_home # positive = decrease | |
| away_change = before_away - after_away # positive = decrease | |
| timeout_team = determine_timeout_team(home_change, away_change) | |
| if timeout_team: | |
| logger.debug( | |
| "Timeout detected: %s team (before=(%d,%d) conf=%.2f, after=(%d,%d) conf=%.2f)", | |
| timeout_team, | |
| before_home, | |
| before_away, | |
| before_conf, | |
| after_home, | |
| after_away, | |
| after_conf, | |
| ) | |
| return timeout_team | |
| def _find_play_end(self, frame_data: List[Dict[str, Any]], frame_idx: int, max_duration: float) -> float: | |
| """ | |
| Find the end time for a clock reset play. | |
| The play ends when EITHER: | |
| - Scorebug/clock disappears (cut to commercial/replay) | |
| - max_duration seconds have elapsed since the reset | |
| Whichever comes first. | |
| Args: | |
| frame_data: Frame data list | |
| frame_idx: Index of frame where 40β25 reset occurred | |
| max_duration: Maximum play duration from reset | |
| Returns: | |
| Play end timestamp | |
| """ | |
| start_timestamp: float = frame_data[frame_idx]["timestamp"] | |
| max_end_time = start_timestamp + max_duration | |
| # Look for scorebug disappearance (but cap at max_duration) | |
| for j in range(frame_idx + 1, len(frame_data)): | |
| frame = frame_data[j] | |
| timestamp: float = frame["timestamp"] | |
| # If we've exceeded max_duration, end at max_duration | |
| if timestamp >= max_end_time: | |
| return max_end_time | |
| # Check for clock/scorebug disappearance | |
| clock_available = frame.get("clock_detected", frame.get("scorebug_detected", False)) | |
| if not clock_available: | |
| return timestamp | |
| # Default: end at max_duration (or end of data if shorter) | |
| return min(max_end_time, float(frame_data[-1]["timestamp"]) if frame_data else max_end_time) | |