cfb40 / src /pipeline /play_extractor.py
andytaylor-smg's picture
adding analyze tab
9b5489a
"""
Play extractor pipeline module.
This module orchestrates the complete play extraction pipeline:
1. Video frame extraction
2. Scorebug detection
3. Play clock reading via template matching
4. Play state machine processing
5. Post-hoc clock reset identification (timeout/special plays)
Performance optimizations:
- Streaming processing: read frame -> process immediately (no intermediate storage)
- Threaded video I/O: background thread reads frames while main thread processes
- Template matching for clock reading (~34x faster than OCR)
Note: OCR-based clock reading has been removed in favor of template matching.
See docs/ocr_to_template_migration.md for details.
"""
import json
import logging
import time
from pathlib import Path
from typing import Optional, List, Dict, Any, Tuple, Union
import cv2
import numpy as np
from detection import DetectScoreBug, ScorebugDetection, DetectTimeouts
from detection.timeouts import CalibratedTimeoutDetector
from readers import FlagReader, ReadPlayClock, PlayClockReading
from setup import DigitTemplateBuilder, DigitTemplateLibrary, PlayClockRegionConfig, PlayClockRegionExtractor
from tracking import FlagInfo, TrackPlayState, PlayEvent, PlayMerger, TimeoutInfo, ClockResetIdentifier
from utils import create_frame_result, log_flag_plays
from video import ThreadedFrameReader
from .models import DetectionConfig, DetectionResult, ParallelProcessingConfig, VideoContext
from .parallel import process_video_parallel
from .template_builder_pass import TemplateBuildingPass
logger = logging.getLogger(__name__)
def format_extraction_result_dict(result: DetectionResult) -> Dict[str, Any]:
"""
Format a DetectionResult into a dictionary for JSON serialization or API return.
Args:
result: DetectionResult to format
Returns:
Dictionary with structured result data
"""
return {
"video": result.video,
"segment": {"start": result.segment_start, "end": result.segment_end},
"processing": {
"total_frames": result.total_frames_processed,
"frames_with_scorebug": result.frames_with_scorebug,
"frames_with_clock": result.frames_with_clock,
},
"timing": result.timing,
"plays": result.plays,
"stats": result.stats,
}
class PlayExtractor:
"""
Main pipeline for extracting plays from video.
This class orchestrates all extraction components:
- DetectScoreBug: Locates scorebug in frames
- ReadPlayClock: Reads play clock digits via template matching
- TrackPlayState: Determines play boundaries
- DetectTimeouts: Tracks timeout indicators for 3-class clock reset classification
- ClockResetIdentifier: Post-hoc identification of timeout/special plays
"""
def __init__(self, config: DetectionConfig, timeout_tracker: Optional[Union[CalibratedTimeoutDetector, DetectTimeouts]] = None, flag_reader: Optional[FlagReader] = None):
"""
Initialize the play extractor pipeline.
Args:
config: Detection configuration
timeout_tracker: Optional timeout tracker for clock reset classification
flag_reader: Optional FLAG reader for penalty flag detection
"""
self.config = config
self.timeout_tracker: Optional[Union[CalibratedTimeoutDetector, DetectTimeouts]] = timeout_tracker
self.flag_reader = flag_reader
# Template-based clock reading components (conditionally initialized)
self.template_builder: Optional[DigitTemplateBuilder] = None
self.template_library: Optional[DigitTemplateLibrary] = None
self.template_reader: Optional[ReadPlayClock] = None
self._validate_config()
# Core components are initialized here (scorebug_detector, clock_reader, state_machine)
self._initialize_components()
def _validate_config(self) -> None:
"""Validate configuration paths exist."""
video_path = Path(self.config.video_path)
if not video_path.exists():
raise FileNotFoundError(f"Video not found: {self.config.video_path}")
# In fixed coordinates mode, template and clock config paths are not required
# since we derive the regions from the fixed coordinates
if not self.config.fixed_playclock_coords:
template_path = Path(self.config.template_path)
if not template_path.exists():
raise FileNotFoundError(f"Scorebug template not found: {self.config.template_path}")
clock_config_path = Path(self.config.clock_region_config_path)
if not clock_config_path.exists():
raise FileNotFoundError(f"Clock region config not found: {self.config.clock_region_config_path}")
def _initialize_components(self) -> None:
"""Initialize extraction components."""
logger.info("Initializing play extractor components...")
# Determine if we're using fixed coordinates mode
# In this mode, we still use the same logic but with pre-set regions
use_fixed_coords = self.config.fixed_playclock_coords is not None
if use_fixed_coords:
# Fixed coordinates mode: derive play clock offset from absolute coords
logger.info("Fixed coordinates mode - regions pre-configured")
# Compute play clock offset relative to scorebug from absolute coordinates
assert self.config.fixed_playclock_coords is not None # Already checked above, helps mypy
pc_x, pc_y, pc_w, pc_h = self.config.fixed_playclock_coords
if self.config.fixed_scorebug_coords:
sb_x, sb_y, _, _ = self.config.fixed_scorebug_coords
x_offset = pc_x - sb_x
y_offset = pc_y - sb_y
else:
# If no scorebug coords provided, treat play clock coords as offset from (0,0)
x_offset, y_offset = pc_x, pc_y
# Create a minimal PlayClockRegionConfig for the clock reader
playclock_config = PlayClockRegionConfig(x_offset=x_offset, y_offset=y_offset, width=pc_w, height=pc_h, source_video="", scorebug_template="", samples_used=0)
# Initialize scorebug detector with template for disappearance detection
# In fixed coordinates mode, we still need template matching to detect
# when scorebug disappears (e.g., during commercials or replays)
self.scorebug_detector: DetectScoreBug = DetectScoreBug(
template_path=self.config.template_path,
fixed_region=self.config.fixed_scorebug_coords,
use_split_detection=self.config.use_split_detection,
)
logger.info("Scorebug detector initialized with template and fixed region: %s", self.config.fixed_scorebug_coords)
# Initialize play clock region extractor with the derived config
self.clock_reader: PlayClockRegionExtractor = PlayClockRegionExtractor(region_config=playclock_config)
logger.info("Play clock region extractor initialized with offset=(%d, %d), size=(%d, %d)", x_offset, y_offset, pc_w, pc_h)
else:
# Standard mode: use template and config files
self.scorebug_detector = DetectScoreBug(template_path=self.config.template_path, use_split_detection=self.config.use_split_detection)
logger.info("Scorebug detector initialized (split_detection=%s)", self.config.use_split_detection)
# Initialize play clock region extractor from config file
self.clock_reader = PlayClockRegionExtractor(region_config_path=self.config.clock_region_config_path)
logger.info("Play clock region extractor initialized")
# Initialize state machine
self.state_machine: TrackPlayState = TrackPlayState()
logger.info("State machine initialized")
# Initialize template matching components
# Determine region dimensions from clock reader config
if self.clock_reader and self.clock_reader.config:
region_w = self.clock_reader.config.width
region_h = self.clock_reader.config.height
else:
region_w, region_h = 50, 28 # defaults
# Try to load pre-built templates if path provided
if self.config.digit_template_path and Path(self.config.digit_template_path).exists():
self.template_library = DigitTemplateLibrary()
if self.template_library.load(self.config.digit_template_path):
logger.info("Loaded pre-built digit templates from %s", self.config.digit_template_path)
self.template_reader = ReadPlayClock(self.template_library, region_w, region_h)
else:
self.template_library = None
logger.info("Could not load templates, will build during extraction")
# Initialize template builder for collection phase if no templates loaded
if self.template_library is None:
self.template_builder = DigitTemplateBuilder(region_w, region_h)
logger.info("Template builder initialized for collection phase")
def _open_video_and_get_context(self) -> Tuple[VideoContext, Dict[str, Any], Dict[str, float]]:
"""
Open video and initialize processing context.
Returns:
Tuple of (VideoContext, stats dict, timing dict)
"""
# Open video
cap = cv2.VideoCapture(self.config.video_path)
if not cap.isOpened():
raise RuntimeError(f"Could not open video: {self.config.video_path}")
# Get video properties
fps = cap.get(cv2.CAP_PROP_FPS)
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
duration = total_frames / fps if fps > 0 else 0
logger.info("Video: %.1fs duration, %.2f fps, %d total frames", duration, fps, total_frames)
# Determine segment bounds
start_time = self.config.start_time
end_time = self.config.end_time if self.config.end_time else duration
# Calculate frame skip for sequential reading
frame_skip = int(self.config.frame_interval * fps)
start_frame = int(start_time * fps)
end_frame = int(end_time * fps)
# Initialize stats and timing
stats = {"total_frames": 0, "frames_with_scorebug": 0, "frames_with_clock": 0}
timing = {
"video_io": 0.0,
"scorebug_detection": 0.0,
"preprocessing": 0.0,
"template_matching": 0.0,
"template_building": 0.0,
"state_machine": 0.0,
}
context = VideoContext(
cap=cap,
fps=fps,
total_frames=total_frames,
duration=duration,
start_time=start_time,
end_time=end_time,
frame_skip=frame_skip,
start_frame=start_frame,
end_frame=end_frame,
)
return context, stats, timing
def _pass0_build_templates_with_real_detection(self, timing: Dict[str, float], progress_dict: Optional[Dict[str, Any]] = None) -> bool:
"""
Pass 0: Build digit templates by scanning video using TEMPLATE-BASED scorebug detection.
Delegates to TemplateBuildingPass which handles the actual scanning and OCR.
Args:
timing: Timing dictionary to update
progress_dict: Optional dict for progress updates
Returns:
True if templates were built successfully, False otherwise
"""
# Use the extracted TemplateBuildingPass module
# Assert: template_builder is initialized when this method is called
assert self.template_builder is not None
template_pass = TemplateBuildingPass(
config=self.config,
clock_reader=self.clock_reader,
template_builder=self.template_builder,
progress_dict=progress_dict,
)
# Run template building
self.template_library, self.template_reader, build_time = template_pass.run()
timing["template_building"] = build_time
return self.template_library is not None
def _streaming_extraction_pass(self, context: VideoContext, stats: Dict[str, Any], timing: Dict[str, float]) -> List[Dict[str, Any]]:
"""
Streaming extraction pass: Read frames, process immediately, no intermediate storage.
This combines the old Pass 1 (frame extraction) and Pass 2 (template matching) into
a single streaming pass. Each frame is:
1. Read from video (in background thread)
2. Scorebug detected/verified
3. Play clock region extracted
4. Template matched immediately
5. State machine updated
Uses threaded video I/O to overlap reading with processing for better performance.
Args:
context: Video context with properties and capture object
stats: Stats dictionary to update
timing: Timing dictionary to update
Returns:
List of frame data dictionaries with all processing results
"""
logger.info("Streaming extraction pass: frame extraction + template matching...")
logger.info(
"Threaded reading: frame_skip=%d (%.2f fps effective), frames %d-%d",
context.frame_skip,
context.fps / context.frame_skip,
context.start_frame,
context.end_frame,
)
# Start threaded frame reader
frame_reader = ThreadedFrameReader(context.cap, context.start_frame, context.end_frame, context.frame_skip, queue_size=32)
frame_reader.start()
# Data structures for results
frame_data: List[Dict[str, Any]] = []
# Flag to track if we've locked the scorebug region
scorebug_region_locked = self.scorebug_detector.is_fixed_region_mode if self.scorebug_detector else False
# Progress tracking
progress_interval = int(30 / self.config.frame_interval) # Log every 30 seconds of video
try:
while True:
# Get next frame from background reader
result = frame_reader.get_frame(timeout=10.0)
if result is None:
break # End of stream
current_frame, frame = result
current_time = current_frame / context.fps
if frame is None:
logger.warning("Could not read frame %d at %.1fs", current_frame, current_time)
continue
stats["total_frames"] += 1
# Process frame with immediate template matching
frame_result = self._process_frame_streaming(frame, current_time, timing, stats, scorebug_region_locked)
# Update scorebug lock status
if not scorebug_region_locked and frame_result.get("scorebug_detected"):
if self.scorebug_detector.discover_and_lock_region(frame):
scorebug_region_locked = True
logger.info("Scorebug region locked at %s", self.scorebug_detector.fixed_region)
frame_data.append(frame_result)
# Progress logging
if stats["total_frames"] % progress_interval == 0:
progress_pct = 100 * (current_time - context.start_time) / (context.end_time - context.start_time)
logger.info("Extraction progress: %.1fs / %.1fs (%.0f%%)", current_time, context.end_time, progress_pct)
finally:
# Stop the reader thread and get I/O timing
frame_reader.stop()
timing["video_io"] = frame_reader.io_time
context.cap.release()
logger.info(
"Streaming extraction complete: %d frames processed, %d with scorebug, %d with clock",
stats["total_frames"],
stats["frames_with_scorebug"],
stats["frames_with_clock"],
)
return frame_data
def _process_frame_streaming(
self,
frame: np.ndarray[Any, Any],
current_time: float,
timing: Dict[str, float],
stats: Dict[str, Any],
scorebug_region_locked: bool,
) -> Dict[str, Any]:
"""
Process a single frame with immediate template matching.
This is the streaming version that processes each frame completely
without storing intermediate data.
Args:
frame: The video frame
current_time: Current timestamp
timing: Timing dictionary to update
stats: Stats dictionary to update
scorebug_region_locked: Whether the scorebug region has been locked
Returns:
Frame data dictionary with all processing results
"""
# Detect scorebug
t_start = time.perf_counter()
if not scorebug_region_locked:
self.scorebug_detector.discover_and_lock_region(frame)
scorebug = self.scorebug_detector.detect(frame)
timing["scorebug_detection"] += time.perf_counter() - t_start
# Initialize frame result using shared factory
frame_result = create_frame_result(
timestamp=current_time,
scorebug_detected=scorebug.detected,
scorebug_bbox=scorebug.bbox if scorebug.detected else None,
)
# Initialize timeout_info and flag_info for state machine
timeout_info = None
flag_info = None
# Determine if scorebug is actually visible (vs just assumed present in fixed coords mode)
scorebug_actually_visible = scorebug.template_matched if scorebug.template_matched is not None else scorebug.detected
if scorebug.detected:
stats["frames_with_scorebug"] += 1
# Read timeout indicators only when scorebug is actually visible
# to avoid garbage readings during commercials/replays
if self.timeout_tracker and self.timeout_tracker.is_configured() and scorebug_actually_visible:
timeout_reading = self.timeout_tracker.read_timeouts(frame)
frame_result["home_timeouts"] = timeout_reading.home_timeouts
frame_result["away_timeouts"] = timeout_reading.away_timeouts
frame_result["timeout_confidence"] = timeout_reading.confidence
# Create TimeoutInfo for state machine clock reset classification
timeout_info = TimeoutInfo(
home_timeouts=timeout_reading.home_timeouts,
away_timeouts=timeout_reading.away_timeouts,
confidence=timeout_reading.confidence,
)
# Read FLAG indicator if reader is configured
# Only read flags when scorebug is actually visible (template_matched)
# to avoid false positives during commercials/replays
if self.flag_reader and scorebug_actually_visible:
assert scorebug.bbox is not None # scorebug.detected implies bbox is set
flag_reading = self.flag_reader.read(frame, scorebug.bbox)
frame_result["flag_detected"] = flag_reading.detected
frame_result["flag_yellow_ratio"] = flag_reading.yellow_ratio
frame_result["flag_mean_hue"] = flag_reading.mean_hue
# Create FlagInfo for state machine with actual visibility status
flag_info = FlagInfo(
detected=flag_reading.detected,
yellow_ratio=flag_reading.yellow_ratio,
mean_hue=flag_reading.mean_hue,
is_valid_yellow=flag_reading.is_valid_yellow,
scorebug_verified=scorebug_actually_visible,
)
# Extract play clock region and run template matching immediately
# Use padded region when in fixed coordinates mode for shift-invariant matching
clock_result = None
t_start = time.perf_counter()
if self.config.fixed_playclock_coords and self.template_reader:
# Fixed coordinates mode: use read_from_fixed_location with padding
# Padding of 4 pixels handles small translational shifts in the broadcast
timing["preprocessing"] += time.perf_counter() - t_start
t_start = time.perf_counter()
clock_result = self.template_reader.read_from_fixed_location(frame, self.config.fixed_playclock_coords, padding=4)
timing["template_matching"] += time.perf_counter() - t_start
else:
# Standard mode: extract region then match
assert scorebug.bbox is not None # scorebug.detected implies bbox is set
play_clock_region = self.clock_reader.extract_region(frame, scorebug.bbox)
timing["preprocessing"] += time.perf_counter() - t_start
if play_clock_region is not None and self.template_reader:
# Run template matching immediately (no intermediate storage!)
t_start = time.perf_counter()
clock_result = self.template_reader.read(play_clock_region)
timing["template_matching"] += time.perf_counter() - t_start
# Process clock result (common path for both fixed and standard modes)
if clock_result is not None:
frame_result["clock_detected"] = clock_result.detected
frame_result["clock_value"] = clock_result.value
if clock_result.detected:
stats["frames_with_clock"] += 1
# Update state machine immediately with timeout and flag info
t_start = time.perf_counter()
clock_reading = PlayClockReading(
detected=clock_result.detected,
value=clock_result.value,
confidence=clock_result.confidence,
raw_text=f"TEMPLATE_{clock_result.value}" if clock_result.detected else "TEMPLATE_FAILED",
)
self.state_machine.update(current_time, scorebug, clock_reading, timeout_info, flag_info)
timing["state_machine"] += time.perf_counter() - t_start
else:
# No scorebug - still update state machine
t_start = time.perf_counter()
clock_reading = PlayClockReading(detected=False, value=None, confidence=0.0, raw_text="NO_SCOREBUG")
self.state_machine.update(current_time, scorebug, clock_reading, timeout_info, flag_info)
timing["state_machine"] += time.perf_counter() - t_start
return frame_result
def _finalize_extraction(
self,
context: VideoContext,
stats: Dict[str, Any],
timing: Dict[str, float],
frame_data: List[Dict[str, Any]],
) -> DetectionResult:
"""
Finalize extraction: run post-hoc clock reset identification and build result.
Uses ClockResetIdentifier for 3-class classification of 40→25 clock reset events:
- Class A (weird_clock): 25 counts down immediately → rejected
- Class B (timeout): Timeout indicator changed → tracked as timeout
- Class C (special): Neither A nor B → special play (punt/FG/XP)
Args:
context: Video context
stats: Processing stats
timing: Timing breakdown
frame_data: List of frame data dicts with clock values and timeout counts
Returns:
Final DetectionResult
"""
# Log timing breakdown
self._log_timing_breakdown(timing)
# Finalize state machine (close any active FLAG events)
self.state_machine.finalize(context.end_time)
# Get plays from state machine (normal 40-second plays)
state_machine_plays = self.state_machine.get_plays()
play_stats = self.state_machine.get_stats()
# Get FLAG plays (tracked independently by FlagTracker)
flag_plays = self.state_machine.get_flag_plays()
logger.info("FLAG plays detected: %d", len(flag_plays))
log_flag_plays(flag_plays, logger)
# Run post-hoc clock reset identification (40→25 transitions)
clock_reset_identifier = ClockResetIdentifier()
clock_reset_plays, clock_reset_stats = clock_reset_identifier.identify(frame_data)
logger.info(
"Clock reset identification: %d total, %d weird (rejected), %d timeouts, %d special plays",
clock_reset_stats.get("total", 0),
clock_reset_stats.get("weird_clock", 0),
clock_reset_stats.get("timeout", 0),
clock_reset_stats.get("special", 0),
)
# Merge clock reset stats into play stats
play_stats["clock_reset_events"] = clock_reset_stats
# Merge state machine plays with clock reset plays AND FLAG plays using PlayMerger
# Note: FLAG plays have absolute priority and are NEVER filtered
merger = PlayMerger()
plays = merger.merge(state_machine_plays, clock_reset_plays, flag_plays)
# Recalculate stats from merged plays
start_methods: Dict[str, int] = {}
end_methods: Dict[str, int] = {}
play_types: Dict[str, int] = {}
flag_plays_count = 0
for play in plays:
start_methods[play.start_method] = start_methods.get(play.start_method, 0) + 1
end_methods[play.end_method] = end_methods.get(play.end_method, 0) + 1
play_types[play.play_type] = play_types.get(play.play_type, 0) + 1
if play.play_type == "flag":
flag_plays_count += 1
play_stats["total_plays"] = len(plays)
play_stats["start_methods"] = start_methods
play_stats["end_methods"] = end_methods
play_stats["play_types"] = play_types
play_stats["flag_plays"] = flag_plays_count
result = DetectionResult(
video=Path(self.config.video_path).name,
segment_start=context.start_time,
segment_end=context.end_time,
total_frames_processed=stats["total_frames"],
frames_with_scorebug=stats["frames_with_scorebug"],
frames_with_clock=stats["frames_with_clock"],
plays=[self._play_to_dict(p) for p in plays],
stats=play_stats,
timing=timing,
)
# Log final summary
logger.info("Extraction complete!")
logger.info("Processed %d frames", stats["total_frames"])
logger.info("Frames with scorebug: %d (%.1f%%)", stats["frames_with_scorebug"], 100 * stats["frames_with_scorebug"] / max(1, stats["total_frames"]))
logger.info("Frames with clock: %d (%.1f%%)", stats["frames_with_clock"], 100 * stats["frames_with_clock"] / max(1, stats["total_frames"]))
logger.info("Plays extracted: %d", len(plays))
return result
def _log_timing_breakdown(self, timing: Dict[str, float]) -> None:
"""Log the timing breakdown for the extraction run."""
total_time = sum(timing.values())
logger.info("=" * 50)
logger.info("TIMING BREAKDOWN")
logger.info("=" * 50)
for section, t_duration in timing.items():
pct = 100 * t_duration / total_time if total_time > 0 else 0
logger.info(" %s: %.2fs (%.1f%%)", section, t_duration, pct)
logger.info(" TOTAL: %.2fs", total_time)
logger.info("=" * 50)
def extract(self) -> DetectionResult:
"""
Run play extraction on the video segment.
Uses streaming processing for optimal performance:
- Pass 0 (if needed): Build digit templates using OCR on scorebug-verified frames
- Streaming pass: Read frame -> extract region -> template match -> state machine update
(threaded video I/O overlaps reading with processing)
- Finalize: Clock reset identification and result building
When fixed coordinates are provided, the scorebug detection step simply verifies
the scorebug is present at the known location (faster than searching).
Returns:
DetectionResult with all extracted plays
"""
logger.info("Starting play extraction...")
logger.info("Video: %s", self.config.video_path)
logger.info("Segment: %.1fs to %s", self.config.start_time, self.config.end_time or "end")
# Initialize timing dict early (needed for Pass 0)
timing = {
"video_io": 0.0,
"scorebug_detection": 0.0,
"preprocessing": 0.0,
"template_matching": 0.0,
"template_building": 0.0,
"state_machine": 0.0,
}
# Pass 0: Build templates using REAL scorebug detection (if needed)
# This scans the video looking for frames with actual scorebugs,
# not just assuming the fixed region always has a scorebug.
# This prevents building garbage templates from pre-game content.
if not self.template_reader and self.template_builder:
success = self._pass0_build_templates_with_real_detection(timing)
if not success:
logger.warning("Pass 0 failed to build templates, extraction may fail or be inaccurate")
# Log mode info (after Pass 0 so we can show if templates were built)
self._log_extraction_mode()
# Initialize video and get processing context
context, stats, _ = self._open_video_and_get_context()
# Streaming extraction pass: read frames + template match + state machine (all in one)
# Uses threaded video I/O to overlap reading with processing
# Returns frame_data needed for post-hoc clock reset identification
frame_data = self._streaming_extraction_pass(context, stats, timing)
# Finalize: Post-hoc clock reset identification (Class A/B/C) and result building
return self._finalize_extraction(context, stats, timing, frame_data)
# pylint: disable=too-many-locals
def extract_parallel(self, num_workers: int = 2, output_dir: Optional[Path] = None, progress_dict: Optional[Dict[str, Any]] = None) -> DetectionResult:
"""
Run play extraction using parallel chunk processing.
This provides ~26% speedup over sequential processing by using multiple
processes to read and process different segments of the video simultaneously.
Process:
1. Pass 0: Build digit templates (single-threaded, required for clock reading)
2. Save templates to disk for worker processes to load
3. Parallel pass: Each worker processes a video chunk independently
4. Merge: Combine frame data from all chunks in chronological order
5. State machine: Process merged data to extract plays
Args:
num_workers: Number of parallel workers (default 2).
output_dir: Output directory for templates (required).
progress_dict: Optional dict to receive progress updates with keys:
overall_pct (0-100), worker_pcts (dict), complete (bool).
Returns:
DetectionResult with all extracted plays
"""
logger.info("Starting parallel play extraction (%d workers)...", num_workers)
logger.info("Video: %s", self.config.video_path)
logger.info("Segment: %.1fs to %s", self.config.start_time, self.config.end_time or "end")
# Initialize timing dict
timing = {
"video_io": 0.0,
"scorebug_detection": 0.0,
"preprocessing": 0.0,
"template_matching": 0.0,
"template_building": 0.0,
"state_machine": 0.0,
}
# Pass 0: Build templates (required before parallel processing)
if not self.template_reader and self.template_builder:
success = self._pass0_build_templates_with_real_detection(timing, progress_dict=progress_dict)
if not success:
logger.warning("Pass 0 failed to build templates, extraction may fail or be inaccurate")
# Save templates to disk for worker processes
template_path = None
if self.template_library and output_dir:
template_path = output_dir / "debug" / "digit_templates"
self.template_library.save(str(template_path))
logger.info("Templates saved to %s for parallel workers", template_path)
# Get video duration for end_time if not specified
cap = cv2.VideoCapture(self.config.video_path)
fps = cap.get(cv2.CAP_PROP_FPS)
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
duration = total_frames / fps if fps > 0 else 0
cap.release()
end_time = self.config.end_time if self.config.end_time else duration
# Get timeout config path
timeout_config_path = None
if self.timeout_tracker and output_dir:
timeout_config_path = str(output_dir / f"{Path(self.config.video_path).stem.replace(' ', '_').replace('.', '_')}_timeout_config.json")
if not Path(timeout_config_path).exists():
timeout_config_path = None
# Run parallel processing
logger.info("Starting parallel frame extraction...")
t_parallel_start = time.perf_counter()
# Create parallel processing config
# Asserts: validated by _validate_config, parallel mode requires fixed coords
assert self.config.fixed_playclock_coords is not None
assert self.config.fixed_scorebug_coords is not None
# Extract FLAG region config from flag_reader if available
flag_x_offset = None
flag_y_offset = None
flag_width = None
flag_height = None
if self.flag_reader:
flag_x_offset = self.flag_reader.flag_x_offset
flag_y_offset = self.flag_reader.flag_y_offset
flag_width = self.flag_reader.flag_width
flag_height = self.flag_reader.flag_height
parallel_config = ParallelProcessingConfig(
video_path=self.config.video_path,
start_time=self.config.start_time,
end_time=end_time,
frame_interval=self.config.frame_interval,
fixed_playclock_coords=self.config.fixed_playclock_coords,
fixed_scorebug_coords=self.config.fixed_scorebug_coords,
template_library_path=str(template_path) if template_path else None,
timeout_config_path=timeout_config_path,
scorebug_template_path=self.config.template_path, # For scorebug verification during FLAG detection
flag_x_offset=flag_x_offset,
flag_y_offset=flag_y_offset,
flag_width=flag_width,
flag_height=flag_height,
)
frame_data, stats, io_time = process_video_parallel(parallel_config, num_workers=num_workers, external_progress_dict=progress_dict)
timing["video_io"] = io_time
# Estimate template matching time from parallel processing
parallel_time = time.perf_counter() - t_parallel_start
timing["template_matching"] = max(0, parallel_time - io_time - timing["template_building"])
logger.info("Parallel processing complete: %d frames", stats["total_frames"])
# Create a minimal context for finalization
context = VideoContext(
cap=None,
fps=fps,
total_frames=total_frames,
duration=duration,
start_time=self.config.start_time,
end_time=end_time,
frame_skip=int(self.config.frame_interval * fps),
start_frame=int(self.config.start_time * fps),
end_frame=int(end_time * fps),
)
# Run state machine on merged frame data
t_sm_start = time.perf_counter()
for frame in frame_data:
# Create proper objects for state machine
# In fixed coords mode: detected=True (assumed), template_matched=actual visibility
scorebug = ScorebugDetection(
detected=frame.get("scorebug_detected", False),
bbox=frame.get("scorebug_bbox"),
confidence=1.0 if frame.get("scorebug_detected") else 0.0,
template_matched=frame.get("scorebug_template_matched"), # For special play end detection
)
clock_reading = PlayClockReading(
detected=frame.get("clock_detected", False),
value=frame.get("clock_value"),
confidence=1.0 if frame.get("clock_detected") else 0.0,
raw_text=f"PARALLEL_{frame.get('clock_value')}" if frame.get("clock_detected") else "PARALLEL_FAILED",
)
# Create timeout info for clock reset classification
timeout_info = None
if frame.get("home_timeouts") is not None or frame.get("away_timeouts") is not None:
timeout_info = TimeoutInfo(
home_timeouts=frame.get("home_timeouts"),
away_timeouts=frame.get("away_timeouts"),
confidence=frame.get("timeout_confidence", 0.0),
)
# Create FLAG info for penalty flag tracking
# Use scorebug_template_matched (actual visibility) to filter false positives during replays/commercials
flag_info = None
if frame.get("flag_detected") is not None:
# In fixed coords mode: scorebug_detected=True (assumed), scorebug_template_matched=actual visibility
scorebug_actually_visible = frame.get("scorebug_template_matched")
if scorebug_actually_visible is None:
scorebug_actually_visible = frame.get("scorebug_detected", True)
flag_info = FlagInfo(
detected=frame.get("flag_detected", False),
yellow_ratio=frame.get("flag_yellow_ratio", 0.0),
mean_hue=frame.get("flag_mean_hue", 0.0),
scorebug_verified=scorebug_actually_visible,
)
self.state_machine.update(frame["timestamp"], scorebug, clock_reading, timeout_info, flag_info)
timing["state_machine"] = time.perf_counter() - t_sm_start
# Update stats dict
stats_dict = {
"total_frames": stats["total_frames"],
"frames_with_scorebug": stats["frames_with_scorebug"],
"frames_with_clock": stats["frames_with_clock"],
}
# Finalize: Post-hoc clock reset identification (Class A/B/C) and result building
return self._finalize_extraction(context, stats_dict, timing, frame_data)
def _log_extraction_mode(self) -> None:
"""Log the extraction mode being used."""
use_fixed_region = self.scorebug_detector and self.scorebug_detector.is_fixed_region_mode
if use_fixed_region:
logger.info("Mode: Fixed region (scorebug location pre-configured)")
if self.scorebug_detector.fixed_region:
logger.info(" Scorebug region: %s", self.scorebug_detector.fixed_region)
else:
logger.info("Mode: Dynamic scorebug detection (will discover and lock region)")
logger.info("Clock reading: Template matching (34x faster than OCR)")
if self.template_reader:
logger.info(" Templates ready (built via real scorebug detection)")
else:
logger.info(" Will build templates using fallback method")
def _play_to_dict(self, play: PlayEvent) -> Dict[str, Any]:
"""Convert PlayEvent to dictionary for JSON serialization."""
return {
"play_number": play.play_number,
"start_time": play.start_time,
"end_time": play.end_time,
"duration": play.end_time - play.start_time,
"confidence": play.confidence,
"start_method": play.start_method,
"end_method": play.end_method,
"direct_end_time": play.direct_end_time,
"start_clock_value": play.start_clock_value,
"end_clock_value": play.end_clock_value,
"play_type": play.play_type,
"has_flag": play.has_flag,
}
def save_results(self, result: DetectionResult, output_path: str) -> None:
"""
Save extraction results to a JSON file.
Args:
result: Extraction results
output_path: Path to output file
"""
output = Path(output_path)
output.parent.mkdir(parents=True, exist_ok=True)
data = format_extraction_result_dict(result)
# Include configuration if provided (for reproducibility)
if result.config:
data["config"] = result.config
with open(output, "w", encoding="utf-8") as f:
json.dump(data, f, indent=2)
logger.info("Results saved to %s", output_path)