cfb40 / src /tracking /clock_reset_identifier.py
andytaylor-smg's picture
flag detection was pretty easy
fbeda03
"""
Clock reset identifier module for post-hoc 40β†’25 transition analysis.
This module identifies and classifies 40β†’25 play clock reset events by analyzing
frame data after the initial extraction pass. It complements the real-time
TrackPlayState by catching timeout and special plays that the state machine
may miss or classify differently.
Classification (Class A/B/C):
- Class A (weird_clock): 25 counts down immediately β†’ rejected (false positive)
- Class B (timeout): Timeout indicator changed β†’ tracked as timeout play
- Class C (special): Neither A nor B β†’ special play (punt/FG/XP/injury)
"""
import logging
from typing import Any, Dict, List, Optional, Tuple
from .models import PlayEvent, determine_timeout_team
logger = logging.getLogger(__name__)
# pylint: disable=too-few-public-methods
class ClockResetIdentifier:
"""
Identifies and classifies 40β†’25 clock reset events from frame data.
This performs post-hoc analysis on extracted frame data to find timeout
and special plays by looking for 40β†’25 clock transitions and classifying
them based on subsequent behavior and timeout indicator changes.
"""
def __init__(
self,
immediate_countdown_window: float = 2.0,
special_play_extension: float = 10.0,
timeout_max_duration: float = 15.0,
):
"""
Initialize the clock reset identifier.
Args:
immediate_countdown_window: Seconds to check if 25 counts down (Class A filter)
special_play_extension: Max duration for special plays (Class C)
timeout_max_duration: Max duration for timeout plays (Class B)
"""
self.immediate_countdown_window = immediate_countdown_window
self.special_play_extension = special_play_extension
self.timeout_max_duration = timeout_max_duration
def identify(self, frame_data: List[Dict[str, Any]]) -> Tuple[List[PlayEvent], Dict[str, int]]:
"""
Identify and classify 40β†’25 clock reset events in frame data.
Scans through frame_data looking for 40β†’25 transitions and classifies each:
- Class A (weird_clock): 25 counts down immediately β†’ rejected
- Class B (timeout): Timeout indicator changed β†’ timeout play
- Class C (special): Neither A nor B β†’ special play
Args:
frame_data: List of frame data dicts with clock_value, timestamp,
home_timeouts, away_timeouts, etc.
Returns:
Tuple of (list of PlayEvent for valid clock resets, stats dict)
"""
plays: List[PlayEvent] = []
stats = {"total": 0, "weird_clock": 0, "timeout": 0, "special": 0}
prev_clock: Optional[int] = None
for i, frame in enumerate(frame_data):
clock_value = frame.get("clock_value")
timestamp: float = frame["timestamp"]
if clock_value is not None:
# Identify 40 β†’ 25 transition
if prev_clock == 40 and clock_value == 25:
stats["total"] += 1
# Check Class A: 25 immediately counts down (weird clock behavior)
is_immediate_countdown = self._check_immediate_countdown(frame_data, i)
# Check Class B: timeout indicator changed
timeout_team = self._check_timeout_change(frame_data, i)
if is_immediate_countdown:
# Class A: Weird clock behavior - reject
stats["weird_clock"] += 1
logger.debug("Clock reset at %.1fs: weird_clock (25 counts down immediately)", timestamp)
elif timeout_team:
# Class B: Team timeout
stats["timeout"] += 1
play_end = self._find_play_end(frame_data, i, max_duration=self.timeout_max_duration)
play = PlayEvent(
play_number=0,
start_time=timestamp,
end_time=play_end,
confidence=0.8,
start_method=f"timeout_{timeout_team}",
end_method="timeout_end",
direct_end_time=play_end,
start_clock_value=prev_clock,
end_clock_value=25,
play_type="timeout",
)
plays.append(play)
logger.debug("Clock reset at %.1fs: timeout (%s team)", timestamp, timeout_team)
else:
# Class C: Special play (punt/FG/XP/injury)
stats["special"] += 1
play_end = self._find_play_end(frame_data, i, max_duration=self.special_play_extension)
play_duration = play_end - timestamp
end_method = "max_duration" if play_duration >= self.special_play_extension - 0.1 else "scorebug_disappeared"
play = PlayEvent(
play_number=0,
start_time=timestamp,
end_time=play_end,
confidence=0.8,
start_method="clock_reset_special",
end_method=end_method,
direct_end_time=play_end,
start_clock_value=prev_clock,
end_clock_value=25,
play_type="special",
)
plays.append(play)
logger.debug("Clock reset at %.1fs: special play (%.1fs duration)", timestamp, play_end - timestamp)
prev_clock = clock_value
return plays, stats
def _check_immediate_countdown(self, frame_data: List[Dict[str, Any]], frame_idx: int) -> bool:
"""
Check if 25 immediately starts counting down (Class A filter).
If the clock shows a value < 25 within the countdown window after
the reset, this indicates weird clock behavior (false positive).
Args:
frame_data: Frame data list
frame_idx: Index of frame where 40β†’25 reset occurred
Returns:
True if 25 counts down immediately (Class A), False otherwise
"""
reset_timestamp: float = frame_data[frame_idx]["timestamp"]
for j in range(frame_idx + 1, len(frame_data)):
frame = frame_data[j]
elapsed = frame["timestamp"] - reset_timestamp
if elapsed > self.immediate_countdown_window:
break
clock_value = frame.get("clock_value")
if clock_value is not None and clock_value < 25:
return True # 25 counted down - weird clock
return False
# Minimum confidence threshold for reliable timeout readings
MIN_TIMEOUT_CONFIDENCE = 0.5
# Delay (seconds) after 40β†’25 before checking timeout indicators
# Must be long enough for the scorebug timeout indicator to update (typically 4-8 seconds)
TIMEOUT_CHECK_DELAY = 4.0
# Maximum time (seconds) after 40β†’25 to look for timeout indicator changes
# Extended to 10s to catch slow-updating indicators (some take 8+ seconds)
TIMEOUT_CHECK_MAX_DELAY = 10.0
def _check_timeout_change(self, frame_data: List[Dict[str, Any]], frame_idx: int) -> Optional[str]:
"""
Check if a timeout indicator changed around the reset (Class B check).
Compares timeout counts before and after the reset to determine
if a team timeout was called.
Validation rules for a valid timeout:
1. Exactly ONE team's count decreased by exactly 1
2. Other team's count stayed the same
3. Both before and after readings have confidence >= MIN_TIMEOUT_CONFIDENCE
Args:
frame_data: Frame data list
frame_idx: Index of frame where 40β†’25 reset occurred
Returns:
"home" or "away" if timeout was used, None otherwise
"""
reset_timestamp: float = frame_data[frame_idx]["timestamp"]
# Get timeout counts BEFORE reset (look back for high-confidence reading)
before_home: Optional[int] = None
before_away: Optional[int] = None
before_conf: float = 0.0
for j in range(frame_idx - 1, max(0, frame_idx - 20), -1):
frame = frame_data[j]
conf = frame.get("timeout_confidence", 0.0)
if frame.get("home_timeouts") is not None and conf >= self.MIN_TIMEOUT_CONFIDENCE:
before_home = frame.get("home_timeouts")
before_away = frame.get("away_timeouts")
before_conf = conf
break
if before_home is None or before_away is None:
return None
# Look forward for timeout change AFTER DELAY (4-10 seconds after reset)
target_time = reset_timestamp + self.TIMEOUT_CHECK_DELAY
max_time = reset_timestamp + self.TIMEOUT_CHECK_MAX_DELAY
after_home: Optional[int] = None
after_away: Optional[int] = None
after_conf: float = 0.0
for j in range(frame_idx + 1, len(frame_data)):
frame = frame_data[j]
timestamp: float = frame["timestamp"]
# Only check after the delay period
if timestamp < target_time:
continue
# Stop if we've gone past the search window
if timestamp > max_time:
break
conf = frame.get("timeout_confidence", 0.0)
if frame.get("home_timeouts") is not None and conf >= self.MIN_TIMEOUT_CONFIDENCE:
after_home = frame.get("home_timeouts")
after_away = frame.get("away_timeouts")
after_conf = conf
break
if after_home is None or after_away is None:
return None
# Calculate changes and determine which team called timeout (if any)
home_change = before_home - after_home # positive = decrease
away_change = before_away - after_away # positive = decrease
timeout_team = determine_timeout_team(home_change, away_change)
if timeout_team:
logger.debug(
"Timeout detected: %s team (before=(%d,%d) conf=%.2f, after=(%d,%d) conf=%.2f)",
timeout_team,
before_home,
before_away,
before_conf,
after_home,
after_away,
after_conf,
)
return timeout_team
def _find_play_end(self, frame_data: List[Dict[str, Any]], frame_idx: int, max_duration: float) -> float:
"""
Find the end time for a clock reset play.
The play ends when EITHER:
- Scorebug/clock disappears (cut to commercial/replay)
- max_duration seconds have elapsed since the reset
Whichever comes first.
Args:
frame_data: Frame data list
frame_idx: Index of frame where 40β†’25 reset occurred
max_duration: Maximum play duration from reset
Returns:
Play end timestamp
"""
start_timestamp: float = frame_data[frame_idx]["timestamp"]
max_end_time = start_timestamp + max_duration
# Look for scorebug disappearance (but cap at max_duration)
for j in range(frame_idx + 1, len(frame_data)):
frame = frame_data[j]
timestamp: float = frame["timestamp"]
# If we've exceeded max_duration, end at max_duration
if timestamp >= max_end_time:
return max_end_time
# Check for clock/scorebug disappearance
clock_available = frame.get("clock_detected", frame.get("scorebug_detected", False))
if not clock_available:
return timestamp
# Default: end at max_duration (or end of data if shorter)
return min(max_end_time, float(frame_data[-1]["timestamp"]) if frame_data else max_end_time)