""" Parallel video processing module. Provides functions for processing video in parallel chunks to achieve speedup on multi-core systems. Uses OpenCV for video reading with each worker process handling a separate time segment of the video. """ import logging import sys import threading import time from concurrent.futures import Future, ProcessPoolExecutor, as_completed from multiprocessing import Manager from pathlib import Path from typing import Any, Dict, List, MutableMapping, Optional, Tuple, Union from detection.timeouts import CalibratedTimeoutDetector from utils import create_frame_result from .models import ChunkResult, ParallelProcessingConfig logger = logging.getLogger(__name__) # ============================================================================= # Chunk Processing Helper Functions (run in subprocess) # ============================================================================= # pylint: disable=too-many-locals def _init_chunk_detectors(config: ParallelProcessingConfig) -> Tuple[Any, Any, Any, Any, Any]: """ Initialize all detection components for a chunk worker. Must be called within the subprocess since these objects can't be pickled. Note: Imports are inside this function because multiprocessing requires fresh imports in each worker process. Moving these to module level would cause pickling errors when spawning workers. Args: config: Parallel processing configuration. Returns: Tuple of (scorebug_detector, clock_reader, template_reader, timeout_tracker, flag_reader). """ # pylint: disable=import-outside-toplevel # Imports must be inside function for multiprocessing - each subprocess # needs its own fresh imports of these modules to avoid pickling errors from detection import DetectScoreBug, DetectTimeouts from readers import FlagReader, ReadPlayClock from setup import DigitTemplateLibrary, PlayClockRegionConfig, PlayClockRegionExtractor # Create scorebug detector with fixed region and template for verification # Template is needed to correctly detect when scorebug is NOT present (replays, commercials) scorebug_detector = DetectScoreBug(template_path=config.scorebug_template_path, use_split_detection=True) scorebug_detector.set_fixed_region(config.fixed_scorebug_coords) # Create play clock region extractor pc_x, pc_y, pc_w, pc_h = config.fixed_playclock_coords sb_x, sb_y, _, _ = config.fixed_scorebug_coords playclock_config = PlayClockRegionConfig( x_offset=pc_x - sb_x, y_offset=pc_y - sb_y, width=pc_w, height=pc_h, source_video="", scorebug_template="", samples_used=0, ) clock_reader = PlayClockRegionExtractor(region_config=playclock_config) # Create template reader if template path provided template_reader = None if config.template_library_path and Path(config.template_library_path).exists(): template_library = DigitTemplateLibrary() if template_library.load(config.template_library_path): template_reader = ReadPlayClock(template_library, pc_w, pc_h) # Initialize timeout tracker if config provided # Try calibrated detector first (with oval positions), fall back to legacy timeout_tracker: Optional[Union[CalibratedTimeoutDetector, DetectTimeouts]] = None if config.timeout_config_path and Path(config.timeout_config_path).exists(): # Try to load as calibrated detector first calibrated = CalibratedTimeoutDetector(config_path=config.timeout_config_path) if calibrated.is_configured(): timeout_tracker = calibrated else: # Fall back to legacy detector timeout_tracker = DetectTimeouts(config_path=config.timeout_config_path) # Initialize FLAG reader if config provided (values verified non-None) flag_reader = None if config.flag_width and config.flag_height and config.flag_x_offset is not None and config.flag_y_offset is not None: flag_reader = FlagReader( flag_x_offset=config.flag_x_offset, flag_y_offset=config.flag_y_offset, flag_width=config.flag_width, flag_height=config.flag_height, ) return scorebug_detector, clock_reader, template_reader, timeout_tracker, flag_reader # pylint: enable=too-many-locals def _process_frame( img: Any, timestamp: float, scorebug_detector: Any, clock_reader: Any, template_reader: Any, timeout_tracker: Any, flag_reader: Any, stats: Dict[str, int], fixed_playclock_coords: Optional[Tuple[int, int, int, int]] = None, fixed_scorebug_coords: Optional[Tuple[int, int, int, int]] = None, ) -> Dict[str, Any]: """ Process a single video frame and extract detection results. Args: img: OpenCV image (numpy array). timestamp: Frame timestamp in seconds. scorebug_detector: Initialized DetectScoreBug. clock_reader: Initialized PlayClockRegionExtractor. template_reader: Initialized ReadPlayClock (or None). timeout_tracker: Initialized DetectTimeouts (or None). flag_reader: Initialized FlagReader (or None). stats: Mutable dict to update with detection statistics. fixed_playclock_coords: Optional fixed play clock coordinates for padded matching. fixed_scorebug_coords: Optional fixed scorebug coordinates. Returns: Dict with frame detection results. """ # Detect scorebug using template matching # In fixed region mode: scorebug.detected=True (assumed present for play tracking) # scorebug.template_matched=actual template match result (for special play end) scorebug = scorebug_detector.detect(img) # In fixed coords mode, always use fixed coords as bbox (since detected=True always) scorebug_bbox = fixed_scorebug_coords if scorebug.detected else None # Initialize frame result with: # - scorebug_detected=True (for play tracking in fixed coords mode) # - scorebug_template_matched=actual template match (for special play end detection) frame_result = create_frame_result( timestamp=timestamp, scorebug_detected=scorebug.detected, scorebug_bbox=scorebug_bbox, ) # Add template_matched for state machine to use in special play end detection frame_result["scorebug_template_matched"] = scorebug.template_matched # Determine if scorebug is actually visible (vs just assumed present in fixed coords mode) scorebug_actually_visible = scorebug.template_matched if scorebug.template_matched is not None else scorebug.detected if scorebug.detected: stats["frames_with_scorebug"] += 1 # Read timeout indicators only when scorebug is actually visible # to avoid garbage readings during commercials/replays if timeout_tracker and timeout_tracker.is_configured() and scorebug_actually_visible: timeout_reading = timeout_tracker.read_timeouts(img) frame_result["home_timeouts"] = timeout_reading.home_timeouts frame_result["away_timeouts"] = timeout_reading.away_timeouts frame_result["timeout_confidence"] = timeout_reading.confidence # Read FLAG indicator if reader is configured # Only read flags when scorebug is actually visible (template_matched) to avoid false positives if flag_reader and scorebug_bbox and scorebug_actually_visible: flag_reading = flag_reader.read(img, scorebug_bbox) frame_result["flag_detected"] = flag_reading.detected frame_result["flag_yellow_ratio"] = flag_reading.yellow_ratio frame_result["flag_mean_hue"] = flag_reading.mean_hue # Read play clock using padded region for shift-invariant matching # Padding of 4 pixels handles small translational shifts in the broadcast if fixed_playclock_coords and template_reader: clock_result = template_reader.read_from_fixed_location(img, fixed_playclock_coords, padding=10) frame_result["clock_detected"] = clock_result.detected frame_result["clock_value"] = clock_result.value if clock_result.detected: stats["frames_with_clock"] += 1 elif template_reader and scorebug_bbox: # Fallback: extract region then match (for non-fixed-coords mode) play_clock_region = clock_reader.extract_region(img, scorebug_bbox) if play_clock_region is not None: clock_result = template_reader.read(play_clock_region) frame_result["clock_detected"] = clock_result.detected frame_result["clock_value"] = clock_result.value if clock_result.detected: stats["frames_with_clock"] += 1 return frame_result # pylint: disable=too-many-locals def _process_chunk( chunk_id: int, config: ParallelProcessingConfig, chunk_start: float, chunk_end: float, progress_dict: Optional[MutableMapping[int, Any]] = None, ) -> ChunkResult: """ Process a single video chunk using FFmpeg pipe for accurate VFR handling. This function runs in a separate process and must be self-contained. It uses FFmpeg for frame extraction which correctly handles Variable Frame Rate videos where OpenCV seeking would return frames out of chronological order. Args: chunk_id: Identifier for this chunk (for logging). config: Parallel processing configuration. chunk_start: Chunk start time in seconds. chunk_end: Chunk end time in seconds. progress_dict: Shared dictionary for progress updates. Returns: ChunkResult with processing results. """ # pylint: disable=import-outside-toplevel # Import must be inside function for multiprocessing - each subprocess # needs its own fresh imports to avoid pickling errors from video.ffmpeg_reader import FFmpegFrameReader t_start = time.perf_counter() # Initialize all detection components scorebug_detector, clock_reader, template_reader, timeout_tracker, flag_reader = _init_chunk_detectors(config) # Initialize processing state frame_data: List[Dict[str, Any]] = [] stats = {"total_frames": 0, "frames_with_scorebug": 0, "frames_with_clock": 0} total_expected_frames = max(1, int((chunk_end - chunk_start) / config.frame_interval)) # Initialize progress if progress_dict is not None: progress_dict[chunk_id] = {"frames": 0, "total": total_expected_frames, "status": "running"} # Use FFmpeg pipe for accurate timestamp handling (handles VFR videos correctly) # This is ~36x faster than OpenCV seeking and produces correct frame order with FFmpegFrameReader(config.video_path, chunk_start, chunk_end, config.frame_interval) as reader: for timestamp, img in reader: # Process this frame stats["total_frames"] += 1 frame_result = _process_frame( img, timestamp, scorebug_detector, clock_reader, template_reader, timeout_tracker, flag_reader, stats, fixed_playclock_coords=config.fixed_playclock_coords, fixed_scorebug_coords=config.fixed_scorebug_coords, ) frame_data.append(frame_result) # Update progress if progress_dict is not None: progress_dict[chunk_id] = {"frames": stats["total_frames"], "total": total_expected_frames, "status": "running"} # Get I/O timing from reader _, io_time = reader.get_stats() # Mark chunk as complete if progress_dict is not None: progress_dict[chunk_id] = {"frames": stats["total_frames"], "total": total_expected_frames, "status": "complete"} return ChunkResult( chunk_id=chunk_id, start_time=chunk_start, end_time=chunk_end, frames_processed=stats["total_frames"], frames_with_scorebug=stats["frames_with_scorebug"], frames_with_clock=stats["frames_with_clock"], frame_data=frame_data, io_time=io_time, processing_time=time.perf_counter() - t_start, ) # ============================================================================= # Parallel Orchestration Helper Functions (run in main process) # ============================================================================= def _calculate_chunk_boundaries(start_time: float, end_time: float, num_workers: int) -> List[Tuple[int, float, float]]: """ Calculate time boundaries for each parallel chunk. Args: start_time: Video start time in seconds. end_time: Video end time in seconds. num_workers: Number of parallel workers. Returns: List of (chunk_id, chunk_start, chunk_end) tuples. """ total_duration = end_time - start_time chunk_duration = total_duration / num_workers chunks = [] for i in range(num_workers): chunk_start = start_time + (i * chunk_duration) chunk_end = start_time + ((i + 1) * chunk_duration) if i == num_workers - 1: chunk_end = end_time # Ensure last chunk goes to exact end chunks.append((i, chunk_start, chunk_end)) return chunks def _create_progress_monitor(progress_dict: MutableMapping[int, Any], num_workers: int) -> Tuple[threading.Thread, threading.Event]: """ Create a progress monitoring thread. Args: progress_dict: Shared dict for worker progress updates. num_workers: Total number of workers to monitor. Returns: Tuple of (monitor_thread, stop_event). """ stop_monitor = threading.Event() def monitor_progress() -> None: """Monitor and display progress from workers.""" while not stop_monitor.is_set(): _display_progress(progress_dict, num_workers) time.sleep(0.5) monitor_thread = threading.Thread(target=monitor_progress, daemon=True) return monitor_thread, stop_monitor def _display_progress(progress_dict: MutableMapping[int, Any], num_workers: int) -> None: """ Build and display current progress string. Args: progress_dict: Shared dict with worker progress. num_workers: Total number of workers. """ parts = [] completed_workers = 0 total_frames_done = 0 total_frames_expected = 0 for i in range(num_workers): if i in progress_dict: worker_progress = progress_dict[i] frames = worker_progress.get("frames", 0) total = worker_progress.get("total", 1) status = worker_progress.get("status", "running") total_frames_done += frames total_frames_expected += total if status == "complete": completed_workers += 1 parts.append(f"W{i}: ✓") else: pct = min(100, int(100 * frames / total)) if total > 0 else 0 parts.append(f"W{i}: {pct}%") if not parts: return # Calculate overall progress, capped at 100% overall_pct = min(100, int(100 * total_frames_done / total_frames_expected)) if total_frames_expected > 0 else 0 progress_str = " | ".join(parts) # Use chained comparison for cleaner code if 0 < completed_workers < num_workers: remaining = num_workers - completed_workers status_msg = f" [{overall_pct:3d}%] {progress_str} — waiting for {remaining} worker{'s' if remaining > 1 else ''}..." elif completed_workers == num_workers: status_msg = f" [100%] {progress_str} — complete!" else: status_msg = f" [{overall_pct:3d}%] {progress_str}" sys.stdout.write(f"\r{status_msg:<80}") sys.stdout.flush() def _submit_chunk_jobs( executor: ProcessPoolExecutor, chunks: List[Tuple[int, float, float]], config: ParallelProcessingConfig, progress_dict: MutableMapping[int, Any], ) -> Dict[Future[ChunkResult], int]: """ Submit all chunk processing jobs to the executor. Args: executor: ProcessPoolExecutor instance. chunks: List of (chunk_id, start_time, end_time) tuples. config: Parallel processing configuration. progress_dict: Shared progress dictionary. Returns: Dict mapping futures to chunk IDs. """ futures = {} for chunk_id, chunk_start, chunk_end in chunks: future = executor.submit( _process_chunk, chunk_id, config, chunk_start, chunk_end, progress_dict, ) futures[future] = chunk_id return futures def _collect_chunk_results(futures: Dict[Future[ChunkResult], int]) -> Dict[int, Optional[ChunkResult]]: """ Collect results from all chunk futures as they complete. Args: futures: Dict mapping futures to chunk IDs. Returns: Dict mapping chunk IDs to ChunkResults (or None on failure). """ results: Dict[int, Optional[ChunkResult]] = {} for future in as_completed(futures): chunk_id = futures[future] try: result = future.result() results[chunk_id] = result logger.info( " Chunk %d complete: %d frames (%.1fs I/O, %.1fs total)", chunk_id, result.frames_processed, result.io_time, result.processing_time, ) except Exception as e: # pylint: disable=broad-except logger.error("Chunk %d failed: %s", chunk_id, e) results[chunk_id] = None return results def _merge_chunk_results(results: Dict[int, Optional[ChunkResult]], num_workers: int) -> Tuple[List[Dict[str, Any]], Dict[str, int], float]: """ Merge results from all chunks in chronological order. Args: results: Dict mapping chunk IDs to ChunkResults. num_workers: Total number of workers (for ordering). Returns: Tuple of (all_frame_data, total_stats, total_io_time). """ all_frame_data: List[Dict[str, Any]] = [] total_stats = {"total_frames": 0, "frames_with_scorebug": 0, "frames_with_clock": 0} total_io_time = 0.0 for i in range(num_workers): result = results.get(i) if result: all_frame_data.extend(result.frame_data) total_stats["total_frames"] += result.frames_processed total_stats["frames_with_scorebug"] += result.frames_with_scorebug total_stats["frames_with_clock"] += result.frames_with_clock total_io_time += result.io_time return all_frame_data, total_stats, total_io_time # ============================================================================= # Main Public Function # ============================================================================= def process_video_parallel( config: ParallelProcessingConfig, num_workers: int = 2, external_progress_dict: Optional[MutableMapping[str, Any]] = None, ) -> Tuple[List[Dict[str, Any]], Dict[str, int], float]: """ Process video in parallel chunks. Splits the video into chunks and processes each chunk in a separate process. Results are merged in chronological order. Args: config: Parallel processing configuration containing video path, time bounds, frame interval, and region coordinates. num_workers: Number of parallel workers (default 2). external_progress_dict: Optional external dict for progress updates. If provided, will be updated with overall_pct (0-100) and worker_pcts dict. Returns: Tuple of (frame_data_list, stats_dict, total_io_time). """ # Calculate chunk boundaries chunks = _calculate_chunk_boundaries(config.start_time, config.end_time, num_workers) chunk_duration = (config.end_time - config.start_time) / num_workers logger.info("Parallel processing: %d workers, %.1fs per chunk", num_workers, chunk_duration) for chunk_id, chunk_start, chunk_end in chunks: logger.info(" Chunk %d: %.1fs - %.1fs", chunk_id, chunk_start, chunk_end) t_start = time.perf_counter() # Create shared progress tracking for workers manager = Manager() progress_dict = manager.dict() # Create progress monitor that also updates external dict def monitor_with_external() -> None: """Monitor and display progress, updating external dict if provided.""" while not stop_monitor.is_set(): _display_progress(progress_dict, num_workers) # Update external progress dict with computed values if external_progress_dict is not None: _update_external_progress(progress_dict, num_workers, external_progress_dict) time.sleep(0.5) stop_monitor = threading.Event() monitor_thread = threading.Thread(target=monitor_with_external, daemon=True) monitor_thread.start() # Execute chunks in parallel with ProcessPoolExecutor(max_workers=num_workers) as executor: futures = _submit_chunk_jobs(executor, chunks, config, progress_dict) results = _collect_chunk_results(futures) # Stop progress monitor and show completion stop_monitor.set() monitor_thread.join(timeout=1.0) sys.stdout.write(f"\r [100%] All {num_workers} workers complete" + " " * 40 + "\n") sys.stdout.flush() # Final update to external progress if external_progress_dict is not None: external_progress_dict["overall_pct"] = 100 external_progress_dict["complete"] = True parallel_time = time.perf_counter() - t_start # Merge results from all chunks all_frame_data, total_stats, total_io_time = _merge_chunk_results(results, num_workers) logger.info("Parallel processing complete: %d total frames in %.1fs (wall clock)", total_stats["total_frames"], parallel_time) logger.info("Combined I/O time: %.1fs (%.1fx speedup from parallelism)", total_io_time, total_io_time / parallel_time if parallel_time > 0 else 1) return all_frame_data, total_stats, total_io_time def _update_external_progress(progress_dict: MutableMapping[int, Any], num_workers: int, external_dict: MutableMapping[str, Any]) -> None: """ Update external progress dict with computed progress values. Args: progress_dict: Internal worker progress dict. num_workers: Total number of workers. external_dict: External dict to update with progress. """ worker_pcts = {} min_pct = 100 for i in range(num_workers): if i in progress_dict: worker_progress = progress_dict[i] frames = worker_progress.get("frames", 0) total = worker_progress.get("total", 1) status = worker_progress.get("status", "running") if status == "complete": pct = 100 else: pct = min(100, int(100 * frames / total)) if total > 0 else 0 worker_pcts[i] = pct min_pct = min(min_pct, pct) else: worker_pcts[i] = 0 min_pct = 0 # Use minimum worker progress as overall (most conservative) external_dict["overall_pct"] = min_pct external_dict["worker_pcts"] = worker_pcts external_dict["complete"] = False