Spaces:
Sleeping
Sleeping
| """ | |
| Parallel video processing module. | |
| Provides functions for processing video in parallel chunks to achieve | |
| speedup on multi-core systems. Uses OpenCV for video reading with each | |
| worker process handling a separate time segment of the video. | |
| """ | |
| import logging | |
| import sys | |
| import threading | |
| import time | |
| from concurrent.futures import Future, ProcessPoolExecutor, as_completed | |
| from multiprocessing import Manager | |
| from pathlib import Path | |
| from typing import Any, Dict, List, MutableMapping, Optional, Tuple, Union | |
| from detection.timeouts import CalibratedTimeoutDetector | |
| from utils import create_frame_result | |
| from .models import ChunkResult, ParallelProcessingConfig | |
| logger = logging.getLogger(__name__) | |
| # ============================================================================= | |
| # Chunk Processing Helper Functions (run in subprocess) | |
| # ============================================================================= | |
| # pylint: disable=too-many-locals | |
| def _init_chunk_detectors(config: ParallelProcessingConfig) -> Tuple[Any, Any, Any, Any, Any]: | |
| """ | |
| Initialize all detection components for a chunk worker. | |
| Must be called within the subprocess since these objects can't be pickled. | |
| Note: Imports are inside this function because multiprocessing requires | |
| fresh imports in each worker process. Moving these to module level would | |
| cause pickling errors when spawning workers. | |
| Args: | |
| config: Parallel processing configuration. | |
| Returns: | |
| Tuple of (scorebug_detector, clock_reader, template_reader, timeout_tracker, flag_reader). | |
| """ | |
| # pylint: disable=import-outside-toplevel | |
| # Imports must be inside function for multiprocessing - each subprocess | |
| # needs its own fresh imports of these modules to avoid pickling errors | |
| from detection import DetectScoreBug, DetectTimeouts | |
| from readers import FlagReader, ReadPlayClock | |
| from setup import DigitTemplateLibrary, PlayClockRegionConfig, PlayClockRegionExtractor | |
| # Create scorebug detector with fixed region and template for verification | |
| # Template is needed to correctly detect when scorebug is NOT present (replays, commercials) | |
| scorebug_detector = DetectScoreBug(template_path=config.scorebug_template_path, use_split_detection=True) | |
| scorebug_detector.set_fixed_region(config.fixed_scorebug_coords) | |
| # Create play clock region extractor | |
| pc_x, pc_y, pc_w, pc_h = config.fixed_playclock_coords | |
| sb_x, sb_y, _, _ = config.fixed_scorebug_coords | |
| playclock_config = PlayClockRegionConfig( | |
| x_offset=pc_x - sb_x, | |
| y_offset=pc_y - sb_y, | |
| width=pc_w, | |
| height=pc_h, | |
| source_video="", | |
| scorebug_template="", | |
| samples_used=0, | |
| ) | |
| clock_reader = PlayClockRegionExtractor(region_config=playclock_config) | |
| # Create template reader if template path provided | |
| template_reader = None | |
| if config.template_library_path and Path(config.template_library_path).exists(): | |
| template_library = DigitTemplateLibrary() | |
| if template_library.load(config.template_library_path): | |
| template_reader = ReadPlayClock(template_library, pc_w, pc_h) | |
| # Initialize timeout tracker if config provided | |
| # Try calibrated detector first (with oval positions), fall back to legacy | |
| timeout_tracker: Optional[Union[CalibratedTimeoutDetector, DetectTimeouts]] = None | |
| if config.timeout_config_path and Path(config.timeout_config_path).exists(): | |
| # Try to load as calibrated detector first | |
| calibrated = CalibratedTimeoutDetector(config_path=config.timeout_config_path) | |
| if calibrated.is_configured(): | |
| timeout_tracker = calibrated | |
| else: | |
| # Fall back to legacy detector | |
| timeout_tracker = DetectTimeouts(config_path=config.timeout_config_path) | |
| # Initialize FLAG reader if config provided (values verified non-None) | |
| flag_reader = None | |
| if config.flag_width and config.flag_height and config.flag_x_offset is not None and config.flag_y_offset is not None: | |
| flag_reader = FlagReader( | |
| flag_x_offset=config.flag_x_offset, | |
| flag_y_offset=config.flag_y_offset, | |
| flag_width=config.flag_width, | |
| flag_height=config.flag_height, | |
| ) | |
| return scorebug_detector, clock_reader, template_reader, timeout_tracker, flag_reader | |
| # pylint: enable=too-many-locals | |
| def _process_frame( | |
| img: Any, | |
| timestamp: float, | |
| scorebug_detector: Any, | |
| clock_reader: Any, | |
| template_reader: Any, | |
| timeout_tracker: Any, | |
| flag_reader: Any, | |
| stats: Dict[str, int], | |
| fixed_playclock_coords: Optional[Tuple[int, int, int, int]] = None, | |
| fixed_scorebug_coords: Optional[Tuple[int, int, int, int]] = None, | |
| ) -> Dict[str, Any]: | |
| """ | |
| Process a single video frame and extract detection results. | |
| Args: | |
| img: OpenCV image (numpy array). | |
| timestamp: Frame timestamp in seconds. | |
| scorebug_detector: Initialized DetectScoreBug. | |
| clock_reader: Initialized PlayClockRegionExtractor. | |
| template_reader: Initialized ReadPlayClock (or None). | |
| timeout_tracker: Initialized DetectTimeouts (or None). | |
| flag_reader: Initialized FlagReader (or None). | |
| stats: Mutable dict to update with detection statistics. | |
| fixed_playclock_coords: Optional fixed play clock coordinates for padded matching. | |
| fixed_scorebug_coords: Optional fixed scorebug coordinates. | |
| Returns: | |
| Dict with frame detection results. | |
| """ | |
| # Detect scorebug using template matching | |
| # In fixed region mode: scorebug.detected=True (assumed present for play tracking) | |
| # scorebug.template_matched=actual template match result (for special play end) | |
| scorebug = scorebug_detector.detect(img) | |
| # In fixed coords mode, always use fixed coords as bbox (since detected=True always) | |
| scorebug_bbox = fixed_scorebug_coords if scorebug.detected else None | |
| # Initialize frame result with: | |
| # - scorebug_detected=True (for play tracking in fixed coords mode) | |
| # - scorebug_template_matched=actual template match (for special play end detection) | |
| frame_result = create_frame_result( | |
| timestamp=timestamp, | |
| scorebug_detected=scorebug.detected, | |
| scorebug_bbox=scorebug_bbox, | |
| ) | |
| # Add template_matched for state machine to use in special play end detection | |
| frame_result["scorebug_template_matched"] = scorebug.template_matched | |
| # Determine if scorebug is actually visible (vs just assumed present in fixed coords mode) | |
| scorebug_actually_visible = scorebug.template_matched if scorebug.template_matched is not None else scorebug.detected | |
| if scorebug.detected: | |
| stats["frames_with_scorebug"] += 1 | |
| # Read timeout indicators only when scorebug is actually visible | |
| # to avoid garbage readings during commercials/replays | |
| if timeout_tracker and timeout_tracker.is_configured() and scorebug_actually_visible: | |
| timeout_reading = timeout_tracker.read_timeouts(img) | |
| frame_result["home_timeouts"] = timeout_reading.home_timeouts | |
| frame_result["away_timeouts"] = timeout_reading.away_timeouts | |
| frame_result["timeout_confidence"] = timeout_reading.confidence | |
| # Read FLAG indicator if reader is configured | |
| # Only read flags when scorebug is actually visible (template_matched) to avoid false positives | |
| if flag_reader and scorebug_bbox and scorebug_actually_visible: | |
| flag_reading = flag_reader.read(img, scorebug_bbox) | |
| frame_result["flag_detected"] = flag_reading.detected | |
| frame_result["flag_yellow_ratio"] = flag_reading.yellow_ratio | |
| frame_result["flag_mean_hue"] = flag_reading.mean_hue | |
| # Read play clock using padded region for shift-invariant matching | |
| # Padding of 4 pixels handles small translational shifts in the broadcast | |
| if fixed_playclock_coords and template_reader: | |
| clock_result = template_reader.read_from_fixed_location(img, fixed_playclock_coords, padding=10) | |
| frame_result["clock_detected"] = clock_result.detected | |
| frame_result["clock_value"] = clock_result.value | |
| if clock_result.detected: | |
| stats["frames_with_clock"] += 1 | |
| elif template_reader and scorebug_bbox: | |
| # Fallback: extract region then match (for non-fixed-coords mode) | |
| play_clock_region = clock_reader.extract_region(img, scorebug_bbox) | |
| if play_clock_region is not None: | |
| clock_result = template_reader.read(play_clock_region) | |
| frame_result["clock_detected"] = clock_result.detected | |
| frame_result["clock_value"] = clock_result.value | |
| if clock_result.detected: | |
| stats["frames_with_clock"] += 1 | |
| return frame_result | |
| # pylint: disable=too-many-locals | |
| def _process_chunk( | |
| chunk_id: int, | |
| config: ParallelProcessingConfig, | |
| chunk_start: float, | |
| chunk_end: float, | |
| progress_dict: Optional[MutableMapping[int, Any]] = None, | |
| ) -> ChunkResult: | |
| """ | |
| Process a single video chunk using FFmpeg pipe for accurate VFR handling. | |
| This function runs in a separate process and must be self-contained. | |
| It uses FFmpeg for frame extraction which correctly handles Variable Frame Rate | |
| videos where OpenCV seeking would return frames out of chronological order. | |
| Args: | |
| chunk_id: Identifier for this chunk (for logging). | |
| config: Parallel processing configuration. | |
| chunk_start: Chunk start time in seconds. | |
| chunk_end: Chunk end time in seconds. | |
| progress_dict: Shared dictionary for progress updates. | |
| Returns: | |
| ChunkResult with processing results. | |
| """ | |
| # pylint: disable=import-outside-toplevel | |
| # Import must be inside function for multiprocessing - each subprocess | |
| # needs its own fresh imports to avoid pickling errors | |
| from video.ffmpeg_reader import FFmpegFrameReader | |
| t_start = time.perf_counter() | |
| # Initialize all detection components | |
| scorebug_detector, clock_reader, template_reader, timeout_tracker, flag_reader = _init_chunk_detectors(config) | |
| # Initialize processing state | |
| frame_data: List[Dict[str, Any]] = [] | |
| stats = {"total_frames": 0, "frames_with_scorebug": 0, "frames_with_clock": 0} | |
| total_expected_frames = max(1, int((chunk_end - chunk_start) / config.frame_interval)) | |
| # Initialize progress | |
| if progress_dict is not None: | |
| progress_dict[chunk_id] = {"frames": 0, "total": total_expected_frames, "status": "running"} | |
| # Use FFmpeg pipe for accurate timestamp handling (handles VFR videos correctly) | |
| # This is ~36x faster than OpenCV seeking and produces correct frame order | |
| with FFmpegFrameReader(config.video_path, chunk_start, chunk_end, config.frame_interval) as reader: | |
| for timestamp, img in reader: | |
| # Process this frame | |
| stats["total_frames"] += 1 | |
| frame_result = _process_frame( | |
| img, | |
| timestamp, | |
| scorebug_detector, | |
| clock_reader, | |
| template_reader, | |
| timeout_tracker, | |
| flag_reader, | |
| stats, | |
| fixed_playclock_coords=config.fixed_playclock_coords, | |
| fixed_scorebug_coords=config.fixed_scorebug_coords, | |
| ) | |
| frame_data.append(frame_result) | |
| # Update progress | |
| if progress_dict is not None: | |
| progress_dict[chunk_id] = {"frames": stats["total_frames"], "total": total_expected_frames, "status": "running"} | |
| # Get I/O timing from reader | |
| _, io_time = reader.get_stats() | |
| # Mark chunk as complete | |
| if progress_dict is not None: | |
| progress_dict[chunk_id] = {"frames": stats["total_frames"], "total": total_expected_frames, "status": "complete"} | |
| return ChunkResult( | |
| chunk_id=chunk_id, | |
| start_time=chunk_start, | |
| end_time=chunk_end, | |
| frames_processed=stats["total_frames"], | |
| frames_with_scorebug=stats["frames_with_scorebug"], | |
| frames_with_clock=stats["frames_with_clock"], | |
| frame_data=frame_data, | |
| io_time=io_time, | |
| processing_time=time.perf_counter() - t_start, | |
| ) | |
| # ============================================================================= | |
| # Parallel Orchestration Helper Functions (run in main process) | |
| # ============================================================================= | |
| def _calculate_chunk_boundaries(start_time: float, end_time: float, num_workers: int) -> List[Tuple[int, float, float]]: | |
| """ | |
| Calculate time boundaries for each parallel chunk. | |
| Args: | |
| start_time: Video start time in seconds. | |
| end_time: Video end time in seconds. | |
| num_workers: Number of parallel workers. | |
| Returns: | |
| List of (chunk_id, chunk_start, chunk_end) tuples. | |
| """ | |
| total_duration = end_time - start_time | |
| chunk_duration = total_duration / num_workers | |
| chunks = [] | |
| for i in range(num_workers): | |
| chunk_start = start_time + (i * chunk_duration) | |
| chunk_end = start_time + ((i + 1) * chunk_duration) | |
| if i == num_workers - 1: | |
| chunk_end = end_time # Ensure last chunk goes to exact end | |
| chunks.append((i, chunk_start, chunk_end)) | |
| return chunks | |
| def _create_progress_monitor(progress_dict: MutableMapping[int, Any], num_workers: int) -> Tuple[threading.Thread, threading.Event]: | |
| """ | |
| Create a progress monitoring thread. | |
| Args: | |
| progress_dict: Shared dict for worker progress updates. | |
| num_workers: Total number of workers to monitor. | |
| Returns: | |
| Tuple of (monitor_thread, stop_event). | |
| """ | |
| stop_monitor = threading.Event() | |
| def monitor_progress() -> None: | |
| """Monitor and display progress from workers.""" | |
| while not stop_monitor.is_set(): | |
| _display_progress(progress_dict, num_workers) | |
| time.sleep(0.5) | |
| monitor_thread = threading.Thread(target=monitor_progress, daemon=True) | |
| return monitor_thread, stop_monitor | |
| def _display_progress(progress_dict: MutableMapping[int, Any], num_workers: int) -> None: | |
| """ | |
| Build and display current progress string. | |
| Args: | |
| progress_dict: Shared dict with worker progress. | |
| num_workers: Total number of workers. | |
| """ | |
| parts = [] | |
| completed_workers = 0 | |
| total_frames_done = 0 | |
| total_frames_expected = 0 | |
| for i in range(num_workers): | |
| if i in progress_dict: | |
| worker_progress = progress_dict[i] | |
| frames = worker_progress.get("frames", 0) | |
| total = worker_progress.get("total", 1) | |
| status = worker_progress.get("status", "running") | |
| total_frames_done += frames | |
| total_frames_expected += total | |
| if status == "complete": | |
| completed_workers += 1 | |
| parts.append(f"W{i}: ✓") | |
| else: | |
| pct = min(100, int(100 * frames / total)) if total > 0 else 0 | |
| parts.append(f"W{i}: {pct}%") | |
| if not parts: | |
| return | |
| # Calculate overall progress, capped at 100% | |
| overall_pct = min(100, int(100 * total_frames_done / total_frames_expected)) if total_frames_expected > 0 else 0 | |
| progress_str = " | ".join(parts) | |
| # Use chained comparison for cleaner code | |
| if 0 < completed_workers < num_workers: | |
| remaining = num_workers - completed_workers | |
| status_msg = f" [{overall_pct:3d}%] {progress_str} — waiting for {remaining} worker{'s' if remaining > 1 else ''}..." | |
| elif completed_workers == num_workers: | |
| status_msg = f" [100%] {progress_str} — complete!" | |
| else: | |
| status_msg = f" [{overall_pct:3d}%] {progress_str}" | |
| sys.stdout.write(f"\r{status_msg:<80}") | |
| sys.stdout.flush() | |
| def _submit_chunk_jobs( | |
| executor: ProcessPoolExecutor, | |
| chunks: List[Tuple[int, float, float]], | |
| config: ParallelProcessingConfig, | |
| progress_dict: MutableMapping[int, Any], | |
| ) -> Dict[Future[ChunkResult], int]: | |
| """ | |
| Submit all chunk processing jobs to the executor. | |
| Args: | |
| executor: ProcessPoolExecutor instance. | |
| chunks: List of (chunk_id, start_time, end_time) tuples. | |
| config: Parallel processing configuration. | |
| progress_dict: Shared progress dictionary. | |
| Returns: | |
| Dict mapping futures to chunk IDs. | |
| """ | |
| futures = {} | |
| for chunk_id, chunk_start, chunk_end in chunks: | |
| future = executor.submit( | |
| _process_chunk, | |
| chunk_id, | |
| config, | |
| chunk_start, | |
| chunk_end, | |
| progress_dict, | |
| ) | |
| futures[future] = chunk_id | |
| return futures | |
| def _collect_chunk_results(futures: Dict[Future[ChunkResult], int]) -> Dict[int, Optional[ChunkResult]]: | |
| """ | |
| Collect results from all chunk futures as they complete. | |
| Args: | |
| futures: Dict mapping futures to chunk IDs. | |
| Returns: | |
| Dict mapping chunk IDs to ChunkResults (or None on failure). | |
| """ | |
| results: Dict[int, Optional[ChunkResult]] = {} | |
| for future in as_completed(futures): | |
| chunk_id = futures[future] | |
| try: | |
| result = future.result() | |
| results[chunk_id] = result | |
| logger.info( | |
| " Chunk %d complete: %d frames (%.1fs I/O, %.1fs total)", | |
| chunk_id, | |
| result.frames_processed, | |
| result.io_time, | |
| result.processing_time, | |
| ) | |
| except Exception as e: # pylint: disable=broad-except | |
| logger.error("Chunk %d failed: %s", chunk_id, e) | |
| results[chunk_id] = None | |
| return results | |
| def _merge_chunk_results(results: Dict[int, Optional[ChunkResult]], num_workers: int) -> Tuple[List[Dict[str, Any]], Dict[str, int], float]: | |
| """ | |
| Merge results from all chunks in chronological order. | |
| Args: | |
| results: Dict mapping chunk IDs to ChunkResults. | |
| num_workers: Total number of workers (for ordering). | |
| Returns: | |
| Tuple of (all_frame_data, total_stats, total_io_time). | |
| """ | |
| all_frame_data: List[Dict[str, Any]] = [] | |
| total_stats = {"total_frames": 0, "frames_with_scorebug": 0, "frames_with_clock": 0} | |
| total_io_time = 0.0 | |
| for i in range(num_workers): | |
| result = results.get(i) | |
| if result: | |
| all_frame_data.extend(result.frame_data) | |
| total_stats["total_frames"] += result.frames_processed | |
| total_stats["frames_with_scorebug"] += result.frames_with_scorebug | |
| total_stats["frames_with_clock"] += result.frames_with_clock | |
| total_io_time += result.io_time | |
| return all_frame_data, total_stats, total_io_time | |
| # ============================================================================= | |
| # Main Public Function | |
| # ============================================================================= | |
| def process_video_parallel( | |
| config: ParallelProcessingConfig, | |
| num_workers: int = 2, | |
| external_progress_dict: Optional[MutableMapping[str, Any]] = None, | |
| ) -> Tuple[List[Dict[str, Any]], Dict[str, int], float]: | |
| """ | |
| Process video in parallel chunks. | |
| Splits the video into chunks and processes each chunk in a separate process. | |
| Results are merged in chronological order. | |
| Args: | |
| config: Parallel processing configuration containing video path, | |
| time bounds, frame interval, and region coordinates. | |
| num_workers: Number of parallel workers (default 2). | |
| external_progress_dict: Optional external dict for progress updates. If provided, | |
| will be updated with overall_pct (0-100) and worker_pcts dict. | |
| Returns: | |
| Tuple of (frame_data_list, stats_dict, total_io_time). | |
| """ | |
| # Calculate chunk boundaries | |
| chunks = _calculate_chunk_boundaries(config.start_time, config.end_time, num_workers) | |
| chunk_duration = (config.end_time - config.start_time) / num_workers | |
| logger.info("Parallel processing: %d workers, %.1fs per chunk", num_workers, chunk_duration) | |
| for chunk_id, chunk_start, chunk_end in chunks: | |
| logger.info(" Chunk %d: %.1fs - %.1fs", chunk_id, chunk_start, chunk_end) | |
| t_start = time.perf_counter() | |
| # Create shared progress tracking for workers | |
| manager = Manager() | |
| progress_dict = manager.dict() | |
| # Create progress monitor that also updates external dict | |
| def monitor_with_external() -> None: | |
| """Monitor and display progress, updating external dict if provided.""" | |
| while not stop_monitor.is_set(): | |
| _display_progress(progress_dict, num_workers) | |
| # Update external progress dict with computed values | |
| if external_progress_dict is not None: | |
| _update_external_progress(progress_dict, num_workers, external_progress_dict) | |
| time.sleep(0.5) | |
| stop_monitor = threading.Event() | |
| monitor_thread = threading.Thread(target=monitor_with_external, daemon=True) | |
| monitor_thread.start() | |
| # Execute chunks in parallel | |
| with ProcessPoolExecutor(max_workers=num_workers) as executor: | |
| futures = _submit_chunk_jobs(executor, chunks, config, progress_dict) | |
| results = _collect_chunk_results(futures) | |
| # Stop progress monitor and show completion | |
| stop_monitor.set() | |
| monitor_thread.join(timeout=1.0) | |
| sys.stdout.write(f"\r [100%] All {num_workers} workers complete" + " " * 40 + "\n") | |
| sys.stdout.flush() | |
| # Final update to external progress | |
| if external_progress_dict is not None: | |
| external_progress_dict["overall_pct"] = 100 | |
| external_progress_dict["complete"] = True | |
| parallel_time = time.perf_counter() - t_start | |
| # Merge results from all chunks | |
| all_frame_data, total_stats, total_io_time = _merge_chunk_results(results, num_workers) | |
| logger.info("Parallel processing complete: %d total frames in %.1fs (wall clock)", total_stats["total_frames"], parallel_time) | |
| logger.info("Combined I/O time: %.1fs (%.1fx speedup from parallelism)", total_io_time, total_io_time / parallel_time if parallel_time > 0 else 1) | |
| return all_frame_data, total_stats, total_io_time | |
| def _update_external_progress(progress_dict: MutableMapping[int, Any], num_workers: int, external_dict: MutableMapping[str, Any]) -> None: | |
| """ | |
| Update external progress dict with computed progress values. | |
| Args: | |
| progress_dict: Internal worker progress dict. | |
| num_workers: Total number of workers. | |
| external_dict: External dict to update with progress. | |
| """ | |
| worker_pcts = {} | |
| min_pct = 100 | |
| for i in range(num_workers): | |
| if i in progress_dict: | |
| worker_progress = progress_dict[i] | |
| frames = worker_progress.get("frames", 0) | |
| total = worker_progress.get("total", 1) | |
| status = worker_progress.get("status", "running") | |
| if status == "complete": | |
| pct = 100 | |
| else: | |
| pct = min(100, int(100 * frames / total)) if total > 0 else 0 | |
| worker_pcts[i] = pct | |
| min_pct = min(min_pct, pct) | |
| else: | |
| worker_pcts[i] = 0 | |
| min_pct = 0 | |
| # Use minimum worker progress as overall (most conservative) | |
| external_dict["overall_pct"] = min_pct | |
| external_dict["worker_pcts"] = worker_pcts | |
| external_dict["complete"] = False | |