#!/usr/bin/env python3 """ Benchmark OCR batching strategies for play clock reading. This script tests different OCR approaches on the 10-minute test clip: 1. Single-image OCR (current approach) 2. Batch preprocessing + sequential OCR 3. Multiprocessing with worker pool The goal is to identify if batching can reduce the ~48% of time spent on OCR. Usage: python scripts/benchmark_ocr_batching.py """ import json import logging import sys import time from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor from dataclasses import dataclass from pathlib import Path from typing import List, Tuple, Optional import cv2 import numpy as np from detection import DetectScoreBug from readers import PlayClockReading from setup import PlayClockRegionExtractor # Path reference for constants PROJECT_ROOT = Path(__file__).parent.parent.parent # Note: _get_easyocr_reader was removed during migration to template matching logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s") logger = logging.getLogger(__name__) # Constants matching main.py testing mode VIDEO_PATH = PROJECT_ROOT / "full_videos" / "OSU vs Tenn 12.21.24.mkv" OUTPUT_DIR = PROJECT_ROOT / "output" TESTING_START_TIME = 38 * 60 + 40 # 38:40 TESTING_END_TIME = 48 * 60 + 40 # 48:40 (10 minutes) FRAME_INTERVAL = 0.5 # 2 fps sampling @dataclass class FrameData: """Container for frame data needed for OCR processing.""" timestamp: float preprocessed_image: np.ndarray scorebug_bbox: Tuple[int, int, int, int] def extract_frames_sequential( video_path: str, start_time: float, end_time: float, frame_interval: float, scorebug_detector: DetectScoreBug, clock_reader: PlayClockRegionExtractor, ) -> Tuple[List[FrameData], dict]: """ Extract and preprocess frames using sequential reading (new optimized approach). Returns: Tuple of (list of FrameData, timing dict) """ logger.info("Extracting frames from %.1fs to %.1fs (interval=%.2fs)...", start_time, end_time, frame_interval) cap = cv2.VideoCapture(video_path) if not cap.isOpened(): raise RuntimeError(f"Could not open video: {video_path}") fps = cap.get(cv2.CAP_PROP_FPS) frame_skip = int(frame_interval * fps) start_frame = int(start_time * fps) end_frame = int(end_time * fps) timing = {"video_io": 0.0, "scorebug_detection": 0.0, "preprocessing": 0.0} frames_data = [] # Seek to start (only initial seek) t_start = time.perf_counter() cap.set(cv2.CAP_PROP_POS_FRAMES, start_frame) timing["video_io"] += time.perf_counter() - t_start # Lock scorebug region on first detection for speed scorebug_locked = False current_frame = start_frame while current_frame < end_frame: current_time = current_frame / fps # Read frame t_start = time.perf_counter() ret, frame = cap.read() timing["video_io"] += time.perf_counter() - t_start if ret: # Detect scorebug t_start = time.perf_counter() if not scorebug_locked: if scorebug_detector.discover_and_lock_region(frame): scorebug_locked = True logger.info("Scorebug locked at frame %d", current_frame) scorebug = scorebug_detector.detect(frame) timing["scorebug_detection"] += time.perf_counter() - t_start if scorebug.detected: # Preprocess for OCR t_start = time.perf_counter() play_clock_region = clock_reader._extract_region(frame, scorebug.bbox) # pylint: disable=protected-access if play_clock_region is not None: preprocessed = clock_reader._preprocess_for_ocr(play_clock_region) # pylint: disable=protected-access frames_data.append(FrameData(timestamp=current_time, preprocessed_image=preprocessed, scorebug_bbox=scorebug.bbox)) timing["preprocessing"] += time.perf_counter() - t_start # Skip frames sequentially t_start = time.perf_counter() for _ in range(frame_skip - 1): cap.grab() timing["video_io"] += time.perf_counter() - t_start current_frame += frame_skip cap.release() logger.info("Extracted %d preprocessed frames", len(frames_data)) return frames_data, timing def benchmark_single_ocr(frames_data: List[FrameData]) -> Tuple[List[PlayClockReading], float]: """ Benchmark single-image OCR (current approach). Returns: Tuple of (list of readings, total OCR time) """ logger.info("Running single-image OCR benchmark on %d frames...", len(frames_data)) reader = _get_easyocr_reader() results = [] total_time = 0.0 for fd in frames_data: t_start = time.perf_counter() ocr_results = reader.readtext(fd.preprocessed_image, allowlist="0123456789", detail=1) elapsed = time.perf_counter() - t_start total_time += elapsed # Parse result if ocr_results: best = max(ocr_results, key=lambda x: x[2]) text, confidence = best[1].strip(), best[2] else: text, confidence = "", 0.0 # Validate as play clock value try: value = int(text) if text and 0 <= int(text) <= 40 else None detected = value is not None except ValueError: value, detected = None, False results.append(PlayClockReading(detected=detected, value=value, confidence=confidence, raw_text=text)) return results, total_time def benchmark_batch_readtext(frames_data: List[FrameData], batch_size: int = 16) -> Tuple[List[PlayClockReading], float]: """ Benchmark batch OCR using EasyOCR's batch capability. EasyOCR can process multiple images but uses readtext per image. This tests if there's any benefit from batching preprocessing/inference. Returns: Tuple of (list of readings, total OCR time) """ logger.info("Running batch OCR benchmark (batch_size=%d) on %d frames...", batch_size, len(frames_data)) reader = _get_easyocr_reader() results = [] total_time = 0.0 # Process in batches for batch_start in range(0, len(frames_data), batch_size): batch = frames_data[batch_start : batch_start + batch_size] # Time the entire batch t_start = time.perf_counter() batch_results = [] for fd in batch: ocr_results = reader.readtext(fd.preprocessed_image, allowlist="0123456789", detail=1) if ocr_results: best = max(ocr_results, key=lambda x: x[2]) text, confidence = best[1].strip(), best[2] else: text, confidence = "", 0.0 batch_results.append((text, confidence)) elapsed = time.perf_counter() - t_start total_time += elapsed # Parse results for text, confidence in batch_results: try: value = int(text) if text and 0 <= int(text) <= 40 else None detected = value is not None except ValueError: value, detected = None, False results.append(PlayClockReading(detected=detected, value=value, confidence=confidence, raw_text=text)) return results, total_time def _ocr_worker_single(preprocessed_image: np.ndarray) -> Tuple[str, float]: """Worker function for thread pool OCR - processes single image.""" reader = _get_easyocr_reader() ocr_results = reader.readtext(preprocessed_image, allowlist="0123456789", detail=1) if ocr_results: best = max(ocr_results, key=lambda x: x[2]) return best[1].strip(), best[2] return "", 0.0 def benchmark_threaded_ocr(frames_data: List[FrameData], num_workers: int = 4) -> Tuple[List[PlayClockReading], float]: """ Benchmark threaded OCR using ThreadPoolExecutor. Note: Due to Python GIL, this may not provide speedup for CPU-bound tasks. However, EasyOCR may release the GIL during inference. Returns: Tuple of (list of readings, total OCR time) """ logger.info("Running threaded OCR benchmark (workers=%d) on %d frames...", num_workers, len(frames_data)) # Pre-warm the reader in main thread _get_easyocr_reader() results = [] t_start = time.perf_counter() with ThreadPoolExecutor(max_workers=num_workers) as executor: # Submit all tasks futures = [executor.submit(_ocr_worker_single, fd.preprocessed_image) for fd in frames_data] # Collect results in order for future in futures: text, confidence = future.result() try: value = int(text) if text and 0 <= int(text) <= 40 else None detected = value is not None except ValueError: value, detected = None, False results.append(PlayClockReading(detected=detected, value=value, confidence=confidence, raw_text=text)) total_time = time.perf_counter() - t_start return results, total_time def compute_accuracy(results: List[PlayClockReading]) -> Tuple[int, int, float]: """Compute detection rate.""" detected = sum(1 for r in results if r.detected) total = len(results) rate = 100 * detected / total if total > 0 else 0 return detected, total, rate def main(): """Run OCR batching benchmarks.""" logger.info("=" * 60) logger.info("OCR Batching Benchmark") logger.info("=" * 60) # Load configs from testing mode output config_path = OUTPUT_DIR / "testing_config.json" playclock_config_path = OUTPUT_DIR / "testing_playclock_config.json" template_path = OUTPUT_DIR / "testing_template.png" if not all(p.exists() for p in [config_path, playclock_config_path, template_path]): logger.error("Missing config files. Run 'python main.py --testing' first to generate them.") logger.error(" Expected: %s", config_path) logger.error(" Expected: %s", playclock_config_path) logger.error(" Expected: %s", template_path) return 1 # Load configs with open(config_path, "r", encoding="utf-8") as f: config = json.load(f) # Initialize detectors scorebug_detector = DetectScoreBug(template_path=str(template_path)) fixed_region = (config["scorebug_x"], config["scorebug_y"], config["scorebug_width"], config["scorebug_height"]) scorebug_detector.set_fixed_region(fixed_region) clock_reader = PlayClockRegionExtractor(region_config_path=str(playclock_config_path)) # Extract and preprocess all frames logger.info("\n--- Frame Extraction Phase ---") frames_data, extract_timing = extract_frames_sequential( str(VIDEO_PATH), TESTING_START_TIME, TESTING_END_TIME, FRAME_INTERVAL, scorebug_detector, clock_reader, ) logger.info("Extraction timing:") for k, v in extract_timing.items(): logger.info(" %s: %.2fs", k, v) logger.info(" Total: %.2fs", sum(extract_timing.values())) if not frames_data: logger.error("No frames extracted!") return 1 # Run benchmarks logger.info("\n--- OCR Benchmarks ---") # Benchmark 1: Single-image OCR (baseline) results_single, time_single = benchmark_single_ocr(frames_data) det_single, tot_single, rate_single = compute_accuracy(results_single) logger.info("\n[1] Single-image OCR:") logger.info(" Time: %.2fs (%.1f ms/frame)", time_single, 1000 * time_single / len(frames_data)) logger.info(" Detection: %d/%d (%.1f%%)", det_single, tot_single, rate_single) # Benchmark 2: Batch OCR (same reader, batched calls) results_batch, time_batch = benchmark_batch_readtext(frames_data, batch_size=16) det_batch, tot_batch, rate_batch = compute_accuracy(results_batch) logger.info("\n[2] Batch OCR (batch_size=16):") logger.info(" Time: %.2fs (%.1f ms/frame)", time_batch, 1000 * time_batch / len(frames_data)) logger.info(" Detection: %d/%d (%.1f%%)", det_batch, tot_batch, rate_batch) logger.info(" Speedup vs single: %.2fx", time_single / time_batch if time_batch > 0 else 0) # Benchmark 3: Threaded OCR (2 workers) results_thread2, time_thread2 = benchmark_threaded_ocr(frames_data, num_workers=2) det_thread2, tot_thread2, rate_thread2 = compute_accuracy(results_thread2) logger.info("\n[3] Threaded OCR (2 workers):") logger.info(" Time: %.2fs (%.1f ms/frame)", time_thread2, 1000 * time_thread2 / len(frames_data)) logger.info(" Detection: %d/%d (%.1f%%)", det_thread2, tot_thread2, rate_thread2) logger.info(" Speedup vs single: %.2fx", time_single / time_thread2 if time_thread2 > 0 else 0) # Benchmark 4: Threaded OCR (4 workers) results_thread4, time_thread4 = benchmark_threaded_ocr(frames_data, num_workers=4) det_thread4, tot_thread4, rate_thread4 = compute_accuracy(results_thread4) logger.info("\n[4] Threaded OCR (4 workers):") logger.info(" Time: %.2fs (%.1f ms/frame)", time_thread4, 1000 * time_thread4 / len(frames_data)) logger.info(" Detection: %d/%d (%.1f%%)", det_thread4, tot_thread4, rate_thread4) logger.info(" Speedup vs single: %.2fx", time_single / time_thread4 if time_thread4 > 0 else 0) # Summary logger.info("\n" + "=" * 60) logger.info("SUMMARY") logger.info("=" * 60) logger.info("Frames processed: %d", len(frames_data)) logger.info("\nOCR Method | Time (s) | ms/frame | Speedup | Det Rate") logger.info("-" * 60) logger.info("Single-image | %8.2f | %8.1f | %7.2fx | %6.1f%%", time_single, 1000 * time_single / len(frames_data), 1.0, rate_single) logger.info( "Batch (size=16) | %8.2f | %8.1f | %7.2fx | %6.1f%%", time_batch, 1000 * time_batch / len(frames_data), time_single / time_batch if time_batch > 0 else 0, rate_batch ) logger.info( "Threaded (2 wkrs) | %8.2f | %8.1f | %7.2fx | %6.1f%%", time_thread2, 1000 * time_thread2 / len(frames_data), time_single / time_thread2 if time_thread2 > 0 else 0, rate_thread2, ) logger.info( "Threaded (4 wkrs) | %8.2f | %8.1f | %7.2fx | %6.1f%%", time_thread4, 1000 * time_thread4 / len(frames_data), time_single / time_thread4 if time_thread4 > 0 else 0, rate_thread4, ) logger.info("=" * 60) # Recommendations best_time = min(time_single, time_batch, time_thread2, time_thread4) if best_time == time_single: logger.info("\nRecommendation: Keep single-image OCR (no speedup from batching)") elif best_time == time_batch: logger.info("\nRecommendation: Use batch OCR with batch_size=16") elif best_time == time_thread2: logger.info("\nRecommendation: Use threaded OCR with 2 workers") else: logger.info("\nRecommendation: Use threaded OCR with 4 workers") return 0 if __name__ == "__main__": sys.exit(main())