cfb40 / src /pipeline /models.py
andytaylor-smg's picture
some decent progress generalizing
137c6cf
"""
Pydantic models for the pipeline module.
This module contains all the data structures used by the pipeline components
for configuration, intermediate results, and final output.
Note: OCR-based clock reading has been removed in favor of template matching.
Streaming processing is used for optimal performance (read frame -> process immediately).
See docs/ocr_to_template_migration.md for details.
"""
from typing import Optional, List, Dict, Any, Tuple
from pydantic import BaseModel, ConfigDict, Field
# =============================================================================
# Configuration Models
# =============================================================================
class DetectionConfig(BaseModel):
"""Configuration for play detection pipeline.
Uses template matching for play clock reading (~34x faster than OCR).
Templates are built dynamically during Pass 0 using OCR for labeling,
then streaming detection processes each frame immediately via template matching.
"""
video_path: str = Field(..., description="Path to video file")
template_path: str = Field(..., description="Path to scorebug template")
clock_region_config_path: str = Field(..., description="Path to play clock region config")
start_time: float = Field(0.0, description="Start time in seconds")
end_time: Optional[float] = Field(None, description="End time in seconds (None = full video)")
frame_interval: float = Field(0.5, description="Interval between frame samples (seconds)")
use_split_detection: bool = Field(True, description="Enable split-half scorebug detection for robustness to partial overlays")
# Template matching configuration
template_collection_frames: int = Field(400, description="Number of frames to use for building digit templates via OCR")
digit_template_path: Optional[str] = Field(None, description="Path to pre-built digit templates (skip collection phase if provided)")
# Fixed coordinates mode - skip scorebug detection entirely for maximum speed
fixed_playclock_coords: Optional[Tuple[int, int, int, int]] = Field(None, description="(x, y, w, h) absolute play clock coords")
fixed_scorebug_coords: Optional[Tuple[int, int, int, int]] = Field(None, description="(x, y, w, h) scorebug region (for metadata)")
# =============================================================================
# Video Processing Models
# =============================================================================
class VideoContext(BaseModel):
"""Container for video properties and processing state."""
model_config = ConfigDict(arbitrary_types_allowed=True)
cap: Any = Field(..., description="cv2.VideoCapture (using Any to avoid cv2 typing issues)")
fps: float = Field(..., description="Frames per second")
total_frames: int = Field(..., description="Total frame count")
duration: float = Field(..., description="Video duration in seconds")
start_time: float = Field(..., description="Segment start time")
end_time: float = Field(..., description="Segment end time")
frame_skip: int = Field(..., description="Frames to skip between samples")
start_frame: int = Field(..., description="First frame to process")
end_frame: int = Field(..., description="Last frame to process")
# =============================================================================
# Result Models
# =============================================================================
class DetectionResult(BaseModel):
"""Results from play detection pipeline."""
video: str = Field(..., description="Video filename")
segment_start: float = Field(..., description="Segment start time")
segment_end: float = Field(..., description="Segment end time")
total_frames_processed: int = Field(..., description="Number of frames analyzed")
frames_with_scorebug: int = Field(..., description="Frames where scorebug was detected")
frames_with_clock: int = Field(..., description="Frames where clock was read successfully")
plays: List[Dict[str, Any]] = Field(default_factory=list, description="Detected plays as dicts")
stats: Dict[str, Any] = Field(default_factory=dict, description="Summary statistics")
timing: Dict[str, float] = Field(default_factory=dict, description="Timing breakdown by section")
config: Dict[str, Any] = Field(default_factory=dict, description="Configuration used for this run (regions, thresholds, etc.)")
# =============================================================================
# Parallel Processing Models (Pydantic)
# =============================================================================
class ParallelProcessingConfig(BaseModel):
"""Configuration for parallel video chunk processing.
This model groups the configuration parameters needed by parallel processing
functions, reducing the number of individual arguments.
"""
video_path: str = Field(..., description="Path to video file")
start_time: float = Field(..., description="Start time in seconds")
end_time: float = Field(..., description="End time in seconds")
frame_interval: float = Field(..., description="Time interval between frames")
fixed_playclock_coords: Tuple[int, int, int, int] = Field(..., description="(x, y, w, h) for play clock region")
fixed_scorebug_coords: Tuple[int, int, int, int] = Field(..., description="(x, y, w, h) for scorebug region")
template_library_path: Optional[str] = Field(None, description="Path to template library directory")
timeout_config_path: Optional[str] = Field(None, description="Path to timeout tracker config")
scorebug_template_path: Optional[str] = Field(None, description="Path to scorebug template image for verification")
# FLAG region config (offsets relative to scorebug)
flag_x_offset: Optional[int] = Field(None, description="FLAG region X offset from scorebug")
flag_y_offset: Optional[int] = Field(None, description="FLAG region Y offset from scorebug")
flag_width: Optional[int] = Field(None, description="FLAG region width")
flag_height: Optional[int] = Field(None, description="FLAG region height")
class ChunkResult(BaseModel):
"""Result from processing a single video chunk in parallel processing.
This is a Pydantic model for better serialization and validation support
when passing data between worker processes.
"""
chunk_id: int = Field(..., description="Identifier for this chunk")
start_time: float = Field(..., description="Chunk start time in seconds")
end_time: float = Field(..., description="Chunk end time in seconds")
frames_processed: int = Field(..., description="Total frames processed in this chunk")
frames_with_scorebug: int = Field(..., description="Frames where scorebug was detected")
frames_with_clock: int = Field(..., description="Frames where clock was successfully read")
frame_data: List[Dict[str, Any]] = Field(..., description="Per-frame detection results")
io_time: float = Field(..., description="Time spent on video I/O operations")
processing_time: float = Field(..., description="Total processing time for this chunk")