cfb40 / src /pipeline /parallel.py
andytaylor-smg's picture
adding analyze tab
9b5489a
"""
Parallel video processing module.
Provides functions for processing video in parallel chunks to achieve
speedup on multi-core systems. Uses OpenCV for video reading with each
worker process handling a separate time segment of the video.
"""
import logging
import sys
import threading
import time
from concurrent.futures import Future, ProcessPoolExecutor, as_completed
from multiprocessing import Manager
from pathlib import Path
from typing import Any, Dict, List, MutableMapping, Optional, Tuple, Union
from detection.timeouts import CalibratedTimeoutDetector
from utils import create_frame_result
from .models import ChunkResult, ParallelProcessingConfig
logger = logging.getLogger(__name__)
# =============================================================================
# Chunk Processing Helper Functions (run in subprocess)
# =============================================================================
# pylint: disable=too-many-locals
def _init_chunk_detectors(config: ParallelProcessingConfig) -> Tuple[Any, Any, Any, Any, Any]:
"""
Initialize all detection components for a chunk worker.
Must be called within the subprocess since these objects can't be pickled.
Note: Imports are inside this function because multiprocessing requires
fresh imports in each worker process. Moving these to module level would
cause pickling errors when spawning workers.
Args:
config: Parallel processing configuration.
Returns:
Tuple of (scorebug_detector, clock_reader, template_reader, timeout_tracker, flag_reader).
"""
# pylint: disable=import-outside-toplevel
# Imports must be inside function for multiprocessing - each subprocess
# needs its own fresh imports of these modules to avoid pickling errors
from detection import DetectScoreBug, DetectTimeouts
from readers import FlagReader, ReadPlayClock
from setup import DigitTemplateLibrary, PlayClockRegionConfig, PlayClockRegionExtractor
# Create scorebug detector with fixed region and template for verification
# Template is needed to correctly detect when scorebug is NOT present (replays, commercials)
scorebug_detector = DetectScoreBug(template_path=config.scorebug_template_path, use_split_detection=True)
scorebug_detector.set_fixed_region(config.fixed_scorebug_coords)
# Create play clock region extractor
pc_x, pc_y, pc_w, pc_h = config.fixed_playclock_coords
sb_x, sb_y, _, _ = config.fixed_scorebug_coords
playclock_config = PlayClockRegionConfig(
x_offset=pc_x - sb_x,
y_offset=pc_y - sb_y,
width=pc_w,
height=pc_h,
source_video="",
scorebug_template="",
samples_used=0,
)
clock_reader = PlayClockRegionExtractor(region_config=playclock_config)
# Create template reader if template path provided
template_reader = None
if config.template_library_path and Path(config.template_library_path).exists():
template_library = DigitTemplateLibrary()
if template_library.load(config.template_library_path):
template_reader = ReadPlayClock(template_library, pc_w, pc_h)
# Initialize timeout tracker if config provided
# Try calibrated detector first (with oval positions), fall back to legacy
timeout_tracker: Optional[Union[CalibratedTimeoutDetector, DetectTimeouts]] = None
if config.timeout_config_path and Path(config.timeout_config_path).exists():
# Try to load as calibrated detector first
calibrated = CalibratedTimeoutDetector(config_path=config.timeout_config_path)
if calibrated.is_configured():
timeout_tracker = calibrated
else:
# Fall back to legacy detector
timeout_tracker = DetectTimeouts(config_path=config.timeout_config_path)
# Initialize FLAG reader if config provided (values verified non-None)
flag_reader = None
if config.flag_width and config.flag_height and config.flag_x_offset is not None and config.flag_y_offset is not None:
flag_reader = FlagReader(
flag_x_offset=config.flag_x_offset,
flag_y_offset=config.flag_y_offset,
flag_width=config.flag_width,
flag_height=config.flag_height,
)
return scorebug_detector, clock_reader, template_reader, timeout_tracker, flag_reader
# pylint: enable=too-many-locals
def _process_frame(
img: Any,
timestamp: float,
scorebug_detector: Any,
clock_reader: Any,
template_reader: Any,
timeout_tracker: Any,
flag_reader: Any,
stats: Dict[str, int],
fixed_playclock_coords: Optional[Tuple[int, int, int, int]] = None,
fixed_scorebug_coords: Optional[Tuple[int, int, int, int]] = None,
) -> Dict[str, Any]:
"""
Process a single video frame and extract detection results.
Args:
img: OpenCV image (numpy array).
timestamp: Frame timestamp in seconds.
scorebug_detector: Initialized DetectScoreBug.
clock_reader: Initialized PlayClockRegionExtractor.
template_reader: Initialized ReadPlayClock (or None).
timeout_tracker: Initialized DetectTimeouts (or None).
flag_reader: Initialized FlagReader (or None).
stats: Mutable dict to update with detection statistics.
fixed_playclock_coords: Optional fixed play clock coordinates for padded matching.
fixed_scorebug_coords: Optional fixed scorebug coordinates.
Returns:
Dict with frame detection results.
"""
# Detect scorebug using template matching
# In fixed region mode: scorebug.detected=True (assumed present for play tracking)
# scorebug.template_matched=actual template match result (for special play end)
scorebug = scorebug_detector.detect(img)
# In fixed coords mode, always use fixed coords as bbox (since detected=True always)
scorebug_bbox = fixed_scorebug_coords if scorebug.detected else None
# Initialize frame result with:
# - scorebug_detected=True (for play tracking in fixed coords mode)
# - scorebug_template_matched=actual template match (for special play end detection)
frame_result = create_frame_result(
timestamp=timestamp,
scorebug_detected=scorebug.detected,
scorebug_bbox=scorebug_bbox,
)
# Add template_matched for state machine to use in special play end detection
frame_result["scorebug_template_matched"] = scorebug.template_matched
# Determine if scorebug is actually visible (vs just assumed present in fixed coords mode)
scorebug_actually_visible = scorebug.template_matched if scorebug.template_matched is not None else scorebug.detected
if scorebug.detected:
stats["frames_with_scorebug"] += 1
# Read timeout indicators only when scorebug is actually visible
# to avoid garbage readings during commercials/replays
if timeout_tracker and timeout_tracker.is_configured() and scorebug_actually_visible:
timeout_reading = timeout_tracker.read_timeouts(img)
frame_result["home_timeouts"] = timeout_reading.home_timeouts
frame_result["away_timeouts"] = timeout_reading.away_timeouts
frame_result["timeout_confidence"] = timeout_reading.confidence
# Read FLAG indicator if reader is configured
# Only read flags when scorebug is actually visible (template_matched) to avoid false positives
if flag_reader and scorebug_bbox and scorebug_actually_visible:
flag_reading = flag_reader.read(img, scorebug_bbox)
frame_result["flag_detected"] = flag_reading.detected
frame_result["flag_yellow_ratio"] = flag_reading.yellow_ratio
frame_result["flag_mean_hue"] = flag_reading.mean_hue
# Read play clock using padded region for shift-invariant matching
# Padding of 4 pixels handles small translational shifts in the broadcast
if fixed_playclock_coords and template_reader:
clock_result = template_reader.read_from_fixed_location(img, fixed_playclock_coords, padding=10)
frame_result["clock_detected"] = clock_result.detected
frame_result["clock_value"] = clock_result.value
if clock_result.detected:
stats["frames_with_clock"] += 1
elif template_reader and scorebug_bbox:
# Fallback: extract region then match (for non-fixed-coords mode)
play_clock_region = clock_reader.extract_region(img, scorebug_bbox)
if play_clock_region is not None:
clock_result = template_reader.read(play_clock_region)
frame_result["clock_detected"] = clock_result.detected
frame_result["clock_value"] = clock_result.value
if clock_result.detected:
stats["frames_with_clock"] += 1
return frame_result
# pylint: disable=too-many-locals
def _process_chunk(
chunk_id: int,
config: ParallelProcessingConfig,
chunk_start: float,
chunk_end: float,
progress_dict: Optional[MutableMapping[int, Any]] = None,
) -> ChunkResult:
"""
Process a single video chunk using FFmpeg pipe for accurate VFR handling.
This function runs in a separate process and must be self-contained.
It uses FFmpeg for frame extraction which correctly handles Variable Frame Rate
videos where OpenCV seeking would return frames out of chronological order.
Args:
chunk_id: Identifier for this chunk (for logging).
config: Parallel processing configuration.
chunk_start: Chunk start time in seconds.
chunk_end: Chunk end time in seconds.
progress_dict: Shared dictionary for progress updates.
Returns:
ChunkResult with processing results.
"""
# pylint: disable=import-outside-toplevel
# Import must be inside function for multiprocessing - each subprocess
# needs its own fresh imports to avoid pickling errors
from video.ffmpeg_reader import FFmpegFrameReader
t_start = time.perf_counter()
# Initialize all detection components
scorebug_detector, clock_reader, template_reader, timeout_tracker, flag_reader = _init_chunk_detectors(config)
# Initialize processing state
frame_data: List[Dict[str, Any]] = []
stats = {"total_frames": 0, "frames_with_scorebug": 0, "frames_with_clock": 0}
total_expected_frames = max(1, int((chunk_end - chunk_start) / config.frame_interval))
# Initialize progress
if progress_dict is not None:
progress_dict[chunk_id] = {"frames": 0, "total": total_expected_frames, "status": "running"}
# Use FFmpeg pipe for accurate timestamp handling (handles VFR videos correctly)
# This is ~36x faster than OpenCV seeking and produces correct frame order
with FFmpegFrameReader(config.video_path, chunk_start, chunk_end, config.frame_interval) as reader:
for timestamp, img in reader:
# Process this frame
stats["total_frames"] += 1
frame_result = _process_frame(
img,
timestamp,
scorebug_detector,
clock_reader,
template_reader,
timeout_tracker,
flag_reader,
stats,
fixed_playclock_coords=config.fixed_playclock_coords,
fixed_scorebug_coords=config.fixed_scorebug_coords,
)
frame_data.append(frame_result)
# Update progress
if progress_dict is not None:
progress_dict[chunk_id] = {"frames": stats["total_frames"], "total": total_expected_frames, "status": "running"}
# Get I/O timing from reader
_, io_time = reader.get_stats()
# Mark chunk as complete
if progress_dict is not None:
progress_dict[chunk_id] = {"frames": stats["total_frames"], "total": total_expected_frames, "status": "complete"}
return ChunkResult(
chunk_id=chunk_id,
start_time=chunk_start,
end_time=chunk_end,
frames_processed=stats["total_frames"],
frames_with_scorebug=stats["frames_with_scorebug"],
frames_with_clock=stats["frames_with_clock"],
frame_data=frame_data,
io_time=io_time,
processing_time=time.perf_counter() - t_start,
)
# =============================================================================
# Parallel Orchestration Helper Functions (run in main process)
# =============================================================================
def _calculate_chunk_boundaries(start_time: float, end_time: float, num_workers: int) -> List[Tuple[int, float, float]]:
"""
Calculate time boundaries for each parallel chunk.
Args:
start_time: Video start time in seconds.
end_time: Video end time in seconds.
num_workers: Number of parallel workers.
Returns:
List of (chunk_id, chunk_start, chunk_end) tuples.
"""
total_duration = end_time - start_time
chunk_duration = total_duration / num_workers
chunks = []
for i in range(num_workers):
chunk_start = start_time + (i * chunk_duration)
chunk_end = start_time + ((i + 1) * chunk_duration)
if i == num_workers - 1:
chunk_end = end_time # Ensure last chunk goes to exact end
chunks.append((i, chunk_start, chunk_end))
return chunks
def _create_progress_monitor(progress_dict: MutableMapping[int, Any], num_workers: int) -> Tuple[threading.Thread, threading.Event]:
"""
Create a progress monitoring thread.
Args:
progress_dict: Shared dict for worker progress updates.
num_workers: Total number of workers to monitor.
Returns:
Tuple of (monitor_thread, stop_event).
"""
stop_monitor = threading.Event()
def monitor_progress() -> None:
"""Monitor and display progress from workers."""
while not stop_monitor.is_set():
_display_progress(progress_dict, num_workers)
time.sleep(0.5)
monitor_thread = threading.Thread(target=monitor_progress, daemon=True)
return monitor_thread, stop_monitor
def _display_progress(progress_dict: MutableMapping[int, Any], num_workers: int) -> None:
"""
Build and display current progress string.
Args:
progress_dict: Shared dict with worker progress.
num_workers: Total number of workers.
"""
parts = []
completed_workers = 0
total_frames_done = 0
total_frames_expected = 0
for i in range(num_workers):
if i in progress_dict:
worker_progress = progress_dict[i]
frames = worker_progress.get("frames", 0)
total = worker_progress.get("total", 1)
status = worker_progress.get("status", "running")
total_frames_done += frames
total_frames_expected += total
if status == "complete":
completed_workers += 1
parts.append(f"W{i}: ✓")
else:
pct = min(100, int(100 * frames / total)) if total > 0 else 0
parts.append(f"W{i}: {pct}%")
if not parts:
return
# Calculate overall progress, capped at 100%
overall_pct = min(100, int(100 * total_frames_done / total_frames_expected)) if total_frames_expected > 0 else 0
progress_str = " | ".join(parts)
# Use chained comparison for cleaner code
if 0 < completed_workers < num_workers:
remaining = num_workers - completed_workers
status_msg = f" [{overall_pct:3d}%] {progress_str} — waiting for {remaining} worker{'s' if remaining > 1 else ''}..."
elif completed_workers == num_workers:
status_msg = f" [100%] {progress_str} — complete!"
else:
status_msg = f" [{overall_pct:3d}%] {progress_str}"
sys.stdout.write(f"\r{status_msg:<80}")
sys.stdout.flush()
def _submit_chunk_jobs(
executor: ProcessPoolExecutor,
chunks: List[Tuple[int, float, float]],
config: ParallelProcessingConfig,
progress_dict: MutableMapping[int, Any],
) -> Dict[Future[ChunkResult], int]:
"""
Submit all chunk processing jobs to the executor.
Args:
executor: ProcessPoolExecutor instance.
chunks: List of (chunk_id, start_time, end_time) tuples.
config: Parallel processing configuration.
progress_dict: Shared progress dictionary.
Returns:
Dict mapping futures to chunk IDs.
"""
futures = {}
for chunk_id, chunk_start, chunk_end in chunks:
future = executor.submit(
_process_chunk,
chunk_id,
config,
chunk_start,
chunk_end,
progress_dict,
)
futures[future] = chunk_id
return futures
def _collect_chunk_results(futures: Dict[Future[ChunkResult], int]) -> Dict[int, Optional[ChunkResult]]:
"""
Collect results from all chunk futures as they complete.
Args:
futures: Dict mapping futures to chunk IDs.
Returns:
Dict mapping chunk IDs to ChunkResults (or None on failure).
"""
results: Dict[int, Optional[ChunkResult]] = {}
for future in as_completed(futures):
chunk_id = futures[future]
try:
result = future.result()
results[chunk_id] = result
logger.info(
" Chunk %d complete: %d frames (%.1fs I/O, %.1fs total)",
chunk_id,
result.frames_processed,
result.io_time,
result.processing_time,
)
except Exception as e: # pylint: disable=broad-except
logger.error("Chunk %d failed: %s", chunk_id, e)
results[chunk_id] = None
return results
def _merge_chunk_results(results: Dict[int, Optional[ChunkResult]], num_workers: int) -> Tuple[List[Dict[str, Any]], Dict[str, int], float]:
"""
Merge results from all chunks in chronological order.
Args:
results: Dict mapping chunk IDs to ChunkResults.
num_workers: Total number of workers (for ordering).
Returns:
Tuple of (all_frame_data, total_stats, total_io_time).
"""
all_frame_data: List[Dict[str, Any]] = []
total_stats = {"total_frames": 0, "frames_with_scorebug": 0, "frames_with_clock": 0}
total_io_time = 0.0
for i in range(num_workers):
result = results.get(i)
if result:
all_frame_data.extend(result.frame_data)
total_stats["total_frames"] += result.frames_processed
total_stats["frames_with_scorebug"] += result.frames_with_scorebug
total_stats["frames_with_clock"] += result.frames_with_clock
total_io_time += result.io_time
return all_frame_data, total_stats, total_io_time
# =============================================================================
# Main Public Function
# =============================================================================
def process_video_parallel(
config: ParallelProcessingConfig,
num_workers: int = 2,
external_progress_dict: Optional[MutableMapping[str, Any]] = None,
) -> Tuple[List[Dict[str, Any]], Dict[str, int], float]:
"""
Process video in parallel chunks.
Splits the video into chunks and processes each chunk in a separate process.
Results are merged in chronological order.
Args:
config: Parallel processing configuration containing video path,
time bounds, frame interval, and region coordinates.
num_workers: Number of parallel workers (default 2).
external_progress_dict: Optional external dict for progress updates. If provided,
will be updated with overall_pct (0-100) and worker_pcts dict.
Returns:
Tuple of (frame_data_list, stats_dict, total_io_time).
"""
# Calculate chunk boundaries
chunks = _calculate_chunk_boundaries(config.start_time, config.end_time, num_workers)
chunk_duration = (config.end_time - config.start_time) / num_workers
logger.info("Parallel processing: %d workers, %.1fs per chunk", num_workers, chunk_duration)
for chunk_id, chunk_start, chunk_end in chunks:
logger.info(" Chunk %d: %.1fs - %.1fs", chunk_id, chunk_start, chunk_end)
t_start = time.perf_counter()
# Create shared progress tracking for workers
manager = Manager()
progress_dict = manager.dict()
# Create progress monitor that also updates external dict
def monitor_with_external() -> None:
"""Monitor and display progress, updating external dict if provided."""
while not stop_monitor.is_set():
_display_progress(progress_dict, num_workers)
# Update external progress dict with computed values
if external_progress_dict is not None:
_update_external_progress(progress_dict, num_workers, external_progress_dict)
time.sleep(0.5)
stop_monitor = threading.Event()
monitor_thread = threading.Thread(target=monitor_with_external, daemon=True)
monitor_thread.start()
# Execute chunks in parallel
with ProcessPoolExecutor(max_workers=num_workers) as executor:
futures = _submit_chunk_jobs(executor, chunks, config, progress_dict)
results = _collect_chunk_results(futures)
# Stop progress monitor and show completion
stop_monitor.set()
monitor_thread.join(timeout=1.0)
sys.stdout.write(f"\r [100%] All {num_workers} workers complete" + " " * 40 + "\n")
sys.stdout.flush()
# Final update to external progress
if external_progress_dict is not None:
external_progress_dict["overall_pct"] = 100
external_progress_dict["complete"] = True
parallel_time = time.perf_counter() - t_start
# Merge results from all chunks
all_frame_data, total_stats, total_io_time = _merge_chunk_results(results, num_workers)
logger.info("Parallel processing complete: %d total frames in %.1fs (wall clock)", total_stats["total_frames"], parallel_time)
logger.info("Combined I/O time: %.1fs (%.1fx speedup from parallelism)", total_io_time, total_io_time / parallel_time if parallel_time > 0 else 1)
return all_frame_data, total_stats, total_io_time
def _update_external_progress(progress_dict: MutableMapping[int, Any], num_workers: int, external_dict: MutableMapping[str, Any]) -> None:
"""
Update external progress dict with computed progress values.
Args:
progress_dict: Internal worker progress dict.
num_workers: Total number of workers.
external_dict: External dict to update with progress.
"""
worker_pcts = {}
min_pct = 100
for i in range(num_workers):
if i in progress_dict:
worker_progress = progress_dict[i]
frames = worker_progress.get("frames", 0)
total = worker_progress.get("total", 1)
status = worker_progress.get("status", "running")
if status == "complete":
pct = 100
else:
pct = min(100, int(100 * frames / total)) if total > 0 else 0
worker_pcts[i] = pct
min_pct = min(min_pct, pct)
else:
worker_pcts[i] = 0
min_pct = 0
# Use minimum worker progress as overall (most conservative)
external_dict["overall_pct"] = min_pct
external_dict["worker_pcts"] = worker_pcts
external_dict["complete"] = False