andytaylor-smg commited on
Commit
72dca15
·
1 Parent(s): fbeda03

Fixing mypy

Browse files
src/detection/timeout_calibrator.py CHANGED
@@ -14,7 +14,7 @@ The calibration process:
14
  """
15
 
16
  import logging
17
- from typing import Any, List, Optional, Tuple
18
 
19
  import cv2
20
  import numpy as np
@@ -86,6 +86,7 @@ def calibrate_timeout_ovals(
86
 
87
  return calibrated
88
 
 
89
  # pylint: disable=too-many-locals
90
  def _find_bright_ovals(roi: np.ndarray[Any, Any]) -> List[OvalLocation]:
91
  """
@@ -148,7 +149,9 @@ def _find_bright_ovals(roi: np.ndarray[Any, Any]) -> List[OvalLocation]:
148
  # Calculate mean brightness of the contour region
149
  mask = np.zeros(gray.shape, dtype=np.uint8)
150
  cv2.drawContours(mask, [contour], -1, 255, -1)
151
- mean_brightness = cv2.mean(gray, mask=mask)[0]
 
 
152
 
153
  # Only keep if significantly bright
154
  if mean_brightness < 100:
@@ -168,8 +171,11 @@ def _find_bright_ovals(roi: np.ndarray[Any, Any]) -> List[OvalLocation]:
168
 
169
  logger.debug("Found %d candidate ovals in region", len(ovals))
170
  return ovals
 
 
171
  # pylint: enable=too-many-locals
172
 
 
173
  def _validate_oval_pattern(ovals: List[OvalLocation]) -> bool:
174
  """
175
  Validate that ovals have consistent spacing (symmetry check).
@@ -259,7 +265,7 @@ def check_oval_brightness(
259
 
260
  # Convert to grayscale and calculate mean brightness
261
  gray = cv2.cvtColor(oval_roi, cv2.COLOR_BGR2GRAY)
262
- current_brightness = float(np.mean(gray))
263
 
264
  # Compare to baseline
265
  threshold = oval.baseline_brightness * brightness_threshold_ratio
 
14
  """
15
 
16
  import logging
17
+ from typing import Any, List, Optional, Tuple, cast
18
 
19
  import cv2
20
  import numpy as np
 
86
 
87
  return calibrated
88
 
89
+
90
  # pylint: disable=too-many-locals
91
  def _find_bright_ovals(roi: np.ndarray[Any, Any]) -> List[OvalLocation]:
92
  """
 
149
  # Calculate mean brightness of the contour region
150
  mask = np.zeros(gray.shape, dtype=np.uint8)
151
  cv2.drawContours(mask, [contour], -1, 255, -1)
152
+ # cv2.mean returns a tuple of 4 floats (per channel); extract the first channel
153
+ mean_brightness_tuple = cast(Tuple[float, float, float, float], cv2.mean(gray, mask=mask))
154
+ mean_brightness = mean_brightness_tuple[0]
155
 
156
  # Only keep if significantly bright
157
  if mean_brightness < 100:
 
171
 
172
  logger.debug("Found %d candidate ovals in region", len(ovals))
173
  return ovals
174
+
175
+
176
  # pylint: enable=too-many-locals
177
 
178
+
179
  def _validate_oval_pattern(ovals: List[OvalLocation]) -> bool:
180
  """
181
  Validate that ovals have consistent spacing (symmetry check).
 
265
 
266
  # Convert to grayscale and calculate mean brightness
267
  gray = cv2.cvtColor(oval_roi, cv2.COLOR_BGR2GRAY)
268
+ current_brightness = float(np.mean(np.asarray(gray)))
269
 
270
  # Compare to baseline
271
  threshold = oval.baseline_brightness * brightness_threshold_ratio
src/detection/timeouts.py CHANGED
@@ -513,7 +513,7 @@ class CalibratedTimeoutDetector:
513
 
514
  # Convert to grayscale and calculate mean brightness
515
  gray = cv2.cvtColor(oval_roi, cv2.COLOR_BGR2GRAY)
516
- current_brightness = float(np.mean(gray))
517
 
518
  # Compare to baseline
519
  threshold = oval.baseline_brightness * self.BRIGHTNESS_THRESHOLD_RATIO
 
513
 
514
  # Convert to grayscale and calculate mean brightness
515
  gray = cv2.cvtColor(oval_roi, cv2.COLOR_BGR2GRAY)
516
+ current_brightness = float(np.mean(np.asarray(gray)))
517
 
518
  # Compare to baseline
519
  threshold = oval.baseline_brightness * self.BRIGHTNESS_THRESHOLD_RATIO
src/pipeline/parallel.py CHANGED
@@ -13,8 +13,9 @@ import time
13
  from concurrent.futures import Future, ProcessPoolExecutor, as_completed
14
  from multiprocessing import Manager
15
  from pathlib import Path
16
- from typing import Any, Dict, List, MutableMapping, Optional, Tuple
17
 
 
18
  from utils import create_frame_result
19
  from .models import ChunkResult, ParallelProcessingConfig
20
 
@@ -25,6 +26,7 @@ logger = logging.getLogger(__name__)
25
  # Chunk Processing Helper Functions (run in subprocess)
26
  # =============================================================================
27
 
 
28
  # pylint: disable=too-many-locals
29
  def _init_chunk_detectors(config: ParallelProcessingConfig) -> Tuple[Any, Any, Any, Any, Any]:
30
  """
@@ -76,10 +78,9 @@ def _init_chunk_detectors(config: ParallelProcessingConfig) -> Tuple[Any, Any, A
76
 
77
  # Initialize timeout tracker if config provided
78
  # Try calibrated detector first (with oval positions), fall back to legacy
79
- timeout_tracker = None
80
- if config.timeout_config_path and Path(config.timeout_config_path).exists():
81
- from detection.timeouts import CalibratedTimeoutDetector
82
 
 
 
83
  # Try to load as calibrated detector first
84
  calibrated = CalibratedTimeoutDetector(config_path=config.timeout_config_path)
85
  if calibrated.is_configured():
@@ -99,8 +100,11 @@ def _init_chunk_detectors(config: ParallelProcessingConfig) -> Tuple[Any, Any, A
99
  )
100
 
101
  return scorebug_detector, clock_reader, template_reader, timeout_tracker, flag_reader
 
 
102
  # pylint: enable=too-many-locals
103
 
 
104
  def _process_frame(
105
  img: Any,
106
  timestamp: float,
 
13
  from concurrent.futures import Future, ProcessPoolExecutor, as_completed
14
  from multiprocessing import Manager
15
  from pathlib import Path
16
+ from typing import Any, Dict, List, MutableMapping, Optional, Tuple, Union
17
 
18
+ from detection.timeouts import CalibratedTimeoutDetector
19
  from utils import create_frame_result
20
  from .models import ChunkResult, ParallelProcessingConfig
21
 
 
26
  # Chunk Processing Helper Functions (run in subprocess)
27
  # =============================================================================
28
 
29
+
30
  # pylint: disable=too-many-locals
31
  def _init_chunk_detectors(config: ParallelProcessingConfig) -> Tuple[Any, Any, Any, Any, Any]:
32
  """
 
78
 
79
  # Initialize timeout tracker if config provided
80
  # Try calibrated detector first (with oval positions), fall back to legacy
 
 
 
81
 
82
+ timeout_tracker: Optional[Union[CalibratedTimeoutDetector, DetectTimeouts]] = None
83
+ if config.timeout_config_path and Path(config.timeout_config_path).exists():
84
  # Try to load as calibrated detector first
85
  calibrated = CalibratedTimeoutDetector(config_path=config.timeout_config_path)
86
  if calibrated.is_configured():
 
100
  )
101
 
102
  return scorebug_detector, clock_reader, template_reader, timeout_tracker, flag_reader
103
+
104
+
105
  # pylint: enable=too-many-locals
106
 
107
+
108
  def _process_frame(
109
  img: Any,
110
  timestamp: float,
src/pipeline/play_extractor.py CHANGED
@@ -21,12 +21,13 @@ import json
21
  import logging
22
  import time
23
  from pathlib import Path
24
- from typing import Optional, List, Dict, Any, Tuple
25
 
26
  import cv2
27
  import numpy as np
28
 
29
  from detection import DetectScoreBug, ScorebugDetection, DetectTimeouts
 
30
  from readers import FlagReader, ReadPlayClock, PlayClockReading
31
  from setup import DigitTemplateBuilder, DigitTemplateLibrary, PlayClockRegionConfig, PlayClockRegionExtractor
32
  from tracking import FlagInfo, TrackPlayState, PlayEvent, PlayMerger, TimeoutInfo, ClockResetIdentifier
@@ -75,7 +76,7 @@ class PlayExtractor:
75
  - ClockResetIdentifier: Post-hoc identification of timeout/special plays
76
  """
77
 
78
- def __init__(self, config: DetectionConfig, timeout_tracker: Optional[DetectTimeouts] = None, flag_reader: Optional[FlagReader] = None):
79
  """
80
  Initialize the play extractor pipeline.
81
 
@@ -85,7 +86,7 @@ class PlayExtractor:
85
  flag_reader: Optional FLAG reader for penalty flag detection
86
  """
87
  self.config = config
88
- self.timeout_tracker = timeout_tracker
89
  self.flag_reader = flag_reader
90
 
91
  # Template-based clock reading components (conditionally initialized)
 
21
  import logging
22
  import time
23
  from pathlib import Path
24
+ from typing import Optional, List, Dict, Any, Tuple, Union
25
 
26
  import cv2
27
  import numpy as np
28
 
29
  from detection import DetectScoreBug, ScorebugDetection, DetectTimeouts
30
+ from detection.timeouts import CalibratedTimeoutDetector
31
  from readers import FlagReader, ReadPlayClock, PlayClockReading
32
  from setup import DigitTemplateBuilder, DigitTemplateLibrary, PlayClockRegionConfig, PlayClockRegionExtractor
33
  from tracking import FlagInfo, TrackPlayState, PlayEvent, PlayMerger, TimeoutInfo, ClockResetIdentifier
 
76
  - ClockResetIdentifier: Post-hoc identification of timeout/special plays
77
  """
78
 
79
+ def __init__(self, config: DetectionConfig, timeout_tracker: Optional[Union[CalibratedTimeoutDetector, DetectTimeouts]] = None, flag_reader: Optional[FlagReader] = None):
80
  """
81
  Initialize the play extractor pipeline.
82
 
 
86
  flag_reader: Optional FLAG reader for penalty flag detection
87
  """
88
  self.config = config
89
+ self.timeout_tracker: Optional[Union[CalibratedTimeoutDetector, DetectTimeouts]] = timeout_tracker
90
  self.flag_reader = flag_reader
91
 
92
  # Template-based clock reading components (conditionally initialized)
src/readers/flags.py CHANGED
@@ -158,7 +158,7 @@ class FlagReader:
158
  # Compute mean hue of yellow pixels
159
  if yellow_pixels > 0:
160
  yellow_hues = hsv[:, :, 0][mask > 0]
161
- mean_hue = float(np.mean(yellow_hues))
162
  else:
163
  mean_hue = 0.0
164
 
 
158
  # Compute mean hue of yellow pixels
159
  if yellow_pixels > 0:
160
  yellow_hues = hsv[:, :, 0][mask > 0]
161
+ mean_hue = float(np.mean(np.asarray(yellow_hues)))
162
  else:
163
  mean_hue = 0.0
164
 
src/tracking/flag_tracker.py CHANGED
@@ -177,7 +177,7 @@ class FlagTracker:
177
  # No scorebug (e.g., replay) - use gap tolerance
178
  return self._handle_no_scorebug(timestamp)
179
 
180
- def _handle_flag_detected(self, timestamp: float, flag_info: FlagInfo) -> Optional[PlayEvent]:
181
  """Handle FLAG being detected."""
182
  self._last_flag_seen_at = timestamp
183
 
@@ -197,6 +197,8 @@ class FlagTracker:
197
  if self._state.current_flag is not None:
198
  self._state.current_flag.update(flag_info.yellow_ratio, flag_info.mean_hue)
199
 
 
 
200
  def _handle_flag_cleared(self, timestamp: float) -> Optional[PlayEvent]:
201
  """Handle FLAG being cleared (scorebug visible, no FLAG)."""
202
  if not self._state.flag_active:
 
177
  # No scorebug (e.g., replay) - use gap tolerance
178
  return self._handle_no_scorebug(timestamp)
179
 
180
+ def _handle_flag_detected(self, timestamp: float, flag_info: FlagInfo) -> Optional[PlayEvent]: # pylint: disable=useless-return
181
  """Handle FLAG being detected."""
182
  self._last_flag_seen_at = timestamp
183
 
 
197
  if self._state.current_flag is not None:
198
  self._state.current_flag.update(flag_info.yellow_ratio, flag_info.mean_hue)
199
 
200
+ return None
201
+
202
  def _handle_flag_cleared(self, timestamp: float) -> Optional[PlayEvent]:
203
  """Handle FLAG being cleared (scorebug visible, no FLAG)."""
204
  if not self._state.flag_active:
src/tracking/play_state.py CHANGED
@@ -21,7 +21,7 @@ The API remains backward compatible with the original implementation.
21
  """
22
 
23
  import logging
24
- from typing import Optional, List
25
 
26
  from detection import ScorebugDetection
27
  from readers import PlayClockReading
@@ -124,7 +124,7 @@ class TrackPlayState:
124
  """Get current state."""
125
  return self._tracker.get_state()
126
 
127
- def get_stats(self) -> dict:
128
  """Get statistics about tracked plays."""
129
  return self._tracker.get_stats()
130
 
 
21
  """
22
 
23
  import logging
24
+ from typing import Any, Dict, List, Optional
25
 
26
  from detection import ScorebugDetection
27
  from readers import PlayClockReading
 
124
  """Get current state."""
125
  return self._tracker.get_state()
126
 
127
+ def get_stats(self) -> Dict[str, Any]:
128
  """Get statistics about tracked plays."""
129
  return self._tracker.get_stats()
130
 
src/tracking/play_tracker.py CHANGED
@@ -23,7 +23,7 @@ FLAG tracking runs in parallel:
23
  """
24
 
25
  import logging
26
- from typing import List, Optional
27
 
28
  from detection import ScorebugDetection
29
  from readers import PlayClockReading
@@ -33,6 +33,7 @@ from .models import (
33
  FlagInfo,
34
  PlayEvent,
35
  PlayState,
 
36
  TimeoutInfo,
37
  TrackerMode,
38
  TrackPlayStateConfig,
@@ -109,7 +110,7 @@ class PlayTracker:
109
  return self._special_tracker.stats
110
 
111
  @property
112
- def flag_tracker_stats(self) -> dict:
113
  """Statistics about FLAG tracking."""
114
  return self._flag_tracker.get_stats()
115
 
@@ -223,7 +224,7 @@ class PlayTracker:
223
  # Mode transitions
224
  # =========================================================================
225
 
226
- def _activate_special_mode(self, handoff) -> None:
227
  """
228
  Activate special mode to handle a 40→25 transition.
229
 
@@ -284,7 +285,7 @@ class PlayTracker:
284
  """Get current state of normal tracker."""
285
  return self._normal_tracker.state
286
 
287
- def get_stats(self) -> dict:
288
  """Get statistics about tracked plays."""
289
  if not self._plays:
290
  return {
 
23
  """
24
 
25
  import logging
26
+ from typing import Any, Dict, List, Optional
27
 
28
  from detection import ScorebugDetection
29
  from readers import PlayClockReading
 
33
  FlagInfo,
34
  PlayEvent,
35
  PlayState,
36
+ SpecialPlayHandoff,
37
  TimeoutInfo,
38
  TrackerMode,
39
  TrackPlayStateConfig,
 
110
  return self._special_tracker.stats
111
 
112
  @property
113
+ def flag_tracker_stats(self) -> Dict[str, Any]:
114
  """Statistics about FLAG tracking."""
115
  return self._flag_tracker.get_stats()
116
 
 
224
  # Mode transitions
225
  # =========================================================================
226
 
227
+ def _activate_special_mode(self, handoff: SpecialPlayHandoff) -> None:
228
  """
229
  Activate special mode to handle a 40→25 transition.
230
 
 
285
  """Get current state of normal tracker."""
286
  return self._normal_tracker.state
287
 
288
+ def get_stats(self) -> Dict[str, Any]:
289
  """Get statistics about tracked plays."""
290
  if not self._plays:
291
  return {
src/tracking/special_play_tracker.py CHANGED
@@ -54,7 +54,7 @@ class SpecialPlayTracker:
54
  # Timeout confirmation requirement
55
  MIN_CONSECUTIVE_TIMEOUT_CONFIRMATIONS = 3 # Require k consecutive frames showing same timeout decrease
56
 
57
- def __init__(self):
58
  """Initialize the special play tracker."""
59
  self._state = SpecialTrackerState()
60
  self._play_count = 0 # Running count for play numbering
@@ -224,8 +224,12 @@ class SpecialPlayTracker:
224
  # Check for timeout indicator change
225
  if timeout_info is not None and timeout_info.confidence >= 0.5:
226
  # Debug: log what we're seeing
227
- home_change = (self._state.home_timeouts_at_40 or 0) - timeout_info.home_timeouts
228
- away_change = (self._state.away_timeouts_at_40 or 0) - timeout_info.away_timeouts
 
 
 
 
229
  logger.debug(
230
  "SpecialPlayTracker check at %.1fs: before(H=%s,A=%s) after(H=%s,A=%s,conf=%.2f) | Δh=%+d Δa=%+d",
231
  timestamp,
 
54
  # Timeout confirmation requirement
55
  MIN_CONSECUTIVE_TIMEOUT_CONFIRMATIONS = 3 # Require k consecutive frames showing same timeout decrease
56
 
57
+ def __init__(self) -> None:
58
  """Initialize the special play tracker."""
59
  self._state = SpecialTrackerState()
60
  self._play_count = 0 # Running count for play numbering
 
224
  # Check for timeout indicator change
225
  if timeout_info is not None and timeout_info.confidence >= 0.5:
226
  # Debug: log what we're seeing
227
+ home_baseline = self._state.home_timeouts_at_40 if self._state.home_timeouts_at_40 is not None else 0
228
+ away_baseline = self._state.away_timeouts_at_40 if self._state.away_timeouts_at_40 is not None else 0
229
+ current_home = timeout_info.home_timeouts if timeout_info.home_timeouts is not None else 0
230
+ current_away = timeout_info.away_timeouts if timeout_info.away_timeouts is not None else 0
231
+ home_change = home_baseline - current_home
232
+ away_change = away_baseline - current_away
233
  logger.debug(
234
  "SpecialPlayTracker check at %.1fs: before(H=%s,A=%s) after(H=%s,A=%s,conf=%.2f) | Δh=%+d Δa=%+d",
235
  timestamp,