cfb40 / scripts /archive /v2 /benchmark_ocr_batching.py
andytaylor-smg's picture
moving stuff all around
6c65498
#!/usr/bin/env python3
"""
Benchmark OCR batching strategies for play clock reading.
This script tests different OCR approaches on the 10-minute test clip:
1. Single-image OCR (current approach)
2. Batch preprocessing + sequential OCR
3. Multiprocessing with worker pool
The goal is to identify if batching can reduce the ~48% of time spent on OCR.
Usage:
python scripts/benchmark_ocr_batching.py
"""
import json
import logging
import sys
import time
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
from dataclasses import dataclass
from pathlib import Path
from typing import List, Tuple, Optional
import cv2
import numpy as np
from detection import DetectScoreBug
from readers import PlayClockReading
from setup import PlayClockRegionExtractor
# Path reference for constants
PROJECT_ROOT = Path(__file__).parent.parent.parent
# Note: _get_easyocr_reader was removed during migration to template matching
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")
logger = logging.getLogger(__name__)
# Constants matching main.py testing mode
VIDEO_PATH = PROJECT_ROOT / "full_videos" / "OSU vs Tenn 12.21.24.mkv"
OUTPUT_DIR = PROJECT_ROOT / "output"
TESTING_START_TIME = 38 * 60 + 40 # 38:40
TESTING_END_TIME = 48 * 60 + 40 # 48:40 (10 minutes)
FRAME_INTERVAL = 0.5 # 2 fps sampling
@dataclass
class FrameData:
"""Container for frame data needed for OCR processing."""
timestamp: float
preprocessed_image: np.ndarray
scorebug_bbox: Tuple[int, int, int, int]
def extract_frames_sequential(
video_path: str,
start_time: float,
end_time: float,
frame_interval: float,
scorebug_detector: DetectScoreBug,
clock_reader: PlayClockRegionExtractor,
) -> Tuple[List[FrameData], dict]:
"""
Extract and preprocess frames using sequential reading (new optimized approach).
Returns:
Tuple of (list of FrameData, timing dict)
"""
logger.info("Extracting frames from %.1fs to %.1fs (interval=%.2fs)...", start_time, end_time, frame_interval)
cap = cv2.VideoCapture(video_path)
if not cap.isOpened():
raise RuntimeError(f"Could not open video: {video_path}")
fps = cap.get(cv2.CAP_PROP_FPS)
frame_skip = int(frame_interval * fps)
start_frame = int(start_time * fps)
end_frame = int(end_time * fps)
timing = {"video_io": 0.0, "scorebug_detection": 0.0, "preprocessing": 0.0}
frames_data = []
# Seek to start (only initial seek)
t_start = time.perf_counter()
cap.set(cv2.CAP_PROP_POS_FRAMES, start_frame)
timing["video_io"] += time.perf_counter() - t_start
# Lock scorebug region on first detection for speed
scorebug_locked = False
current_frame = start_frame
while current_frame < end_frame:
current_time = current_frame / fps
# Read frame
t_start = time.perf_counter()
ret, frame = cap.read()
timing["video_io"] += time.perf_counter() - t_start
if ret:
# Detect scorebug
t_start = time.perf_counter()
if not scorebug_locked:
if scorebug_detector.discover_and_lock_region(frame):
scorebug_locked = True
logger.info("Scorebug locked at frame %d", current_frame)
scorebug = scorebug_detector.detect(frame)
timing["scorebug_detection"] += time.perf_counter() - t_start
if scorebug.detected:
# Preprocess for OCR
t_start = time.perf_counter()
play_clock_region = clock_reader._extract_region(frame, scorebug.bbox) # pylint: disable=protected-access
if play_clock_region is not None:
preprocessed = clock_reader._preprocess_for_ocr(play_clock_region) # pylint: disable=protected-access
frames_data.append(FrameData(timestamp=current_time, preprocessed_image=preprocessed, scorebug_bbox=scorebug.bbox))
timing["preprocessing"] += time.perf_counter() - t_start
# Skip frames sequentially
t_start = time.perf_counter()
for _ in range(frame_skip - 1):
cap.grab()
timing["video_io"] += time.perf_counter() - t_start
current_frame += frame_skip
cap.release()
logger.info("Extracted %d preprocessed frames", len(frames_data))
return frames_data, timing
def benchmark_single_ocr(frames_data: List[FrameData]) -> Tuple[List[PlayClockReading], float]:
"""
Benchmark single-image OCR (current approach).
Returns:
Tuple of (list of readings, total OCR time)
"""
logger.info("Running single-image OCR benchmark on %d frames...", len(frames_data))
reader = _get_easyocr_reader()
results = []
total_time = 0.0
for fd in frames_data:
t_start = time.perf_counter()
ocr_results = reader.readtext(fd.preprocessed_image, allowlist="0123456789", detail=1)
elapsed = time.perf_counter() - t_start
total_time += elapsed
# Parse result
if ocr_results:
best = max(ocr_results, key=lambda x: x[2])
text, confidence = best[1].strip(), best[2]
else:
text, confidence = "", 0.0
# Validate as play clock value
try:
value = int(text) if text and 0 <= int(text) <= 40 else None
detected = value is not None
except ValueError:
value, detected = None, False
results.append(PlayClockReading(detected=detected, value=value, confidence=confidence, raw_text=text))
return results, total_time
def benchmark_batch_readtext(frames_data: List[FrameData], batch_size: int = 16) -> Tuple[List[PlayClockReading], float]:
"""
Benchmark batch OCR using EasyOCR's batch capability.
EasyOCR can process multiple images but uses readtext per image.
This tests if there's any benefit from batching preprocessing/inference.
Returns:
Tuple of (list of readings, total OCR time)
"""
logger.info("Running batch OCR benchmark (batch_size=%d) on %d frames...", batch_size, len(frames_data))
reader = _get_easyocr_reader()
results = []
total_time = 0.0
# Process in batches
for batch_start in range(0, len(frames_data), batch_size):
batch = frames_data[batch_start : batch_start + batch_size]
# Time the entire batch
t_start = time.perf_counter()
batch_results = []
for fd in batch:
ocr_results = reader.readtext(fd.preprocessed_image, allowlist="0123456789", detail=1)
if ocr_results:
best = max(ocr_results, key=lambda x: x[2])
text, confidence = best[1].strip(), best[2]
else:
text, confidence = "", 0.0
batch_results.append((text, confidence))
elapsed = time.perf_counter() - t_start
total_time += elapsed
# Parse results
for text, confidence in batch_results:
try:
value = int(text) if text and 0 <= int(text) <= 40 else None
detected = value is not None
except ValueError:
value, detected = None, False
results.append(PlayClockReading(detected=detected, value=value, confidence=confidence, raw_text=text))
return results, total_time
def _ocr_worker_single(preprocessed_image: np.ndarray) -> Tuple[str, float]:
"""Worker function for thread pool OCR - processes single image."""
reader = _get_easyocr_reader()
ocr_results = reader.readtext(preprocessed_image, allowlist="0123456789", detail=1)
if ocr_results:
best = max(ocr_results, key=lambda x: x[2])
return best[1].strip(), best[2]
return "", 0.0
def benchmark_threaded_ocr(frames_data: List[FrameData], num_workers: int = 4) -> Tuple[List[PlayClockReading], float]:
"""
Benchmark threaded OCR using ThreadPoolExecutor.
Note: Due to Python GIL, this may not provide speedup for CPU-bound tasks.
However, EasyOCR may release the GIL during inference.
Returns:
Tuple of (list of readings, total OCR time)
"""
logger.info("Running threaded OCR benchmark (workers=%d) on %d frames...", num_workers, len(frames_data))
# Pre-warm the reader in main thread
_get_easyocr_reader()
results = []
t_start = time.perf_counter()
with ThreadPoolExecutor(max_workers=num_workers) as executor:
# Submit all tasks
futures = [executor.submit(_ocr_worker_single, fd.preprocessed_image) for fd in frames_data]
# Collect results in order
for future in futures:
text, confidence = future.result()
try:
value = int(text) if text and 0 <= int(text) <= 40 else None
detected = value is not None
except ValueError:
value, detected = None, False
results.append(PlayClockReading(detected=detected, value=value, confidence=confidence, raw_text=text))
total_time = time.perf_counter() - t_start
return results, total_time
def compute_accuracy(results: List[PlayClockReading]) -> Tuple[int, int, float]:
"""Compute detection rate."""
detected = sum(1 for r in results if r.detected)
total = len(results)
rate = 100 * detected / total if total > 0 else 0
return detected, total, rate
def main():
"""Run OCR batching benchmarks."""
logger.info("=" * 60)
logger.info("OCR Batching Benchmark")
logger.info("=" * 60)
# Load configs from testing mode output
config_path = OUTPUT_DIR / "testing_config.json"
playclock_config_path = OUTPUT_DIR / "testing_playclock_config.json"
template_path = OUTPUT_DIR / "testing_template.png"
if not all(p.exists() for p in [config_path, playclock_config_path, template_path]):
logger.error("Missing config files. Run 'python main.py --testing' first to generate them.")
logger.error(" Expected: %s", config_path)
logger.error(" Expected: %s", playclock_config_path)
logger.error(" Expected: %s", template_path)
return 1
# Load configs
with open(config_path, "r", encoding="utf-8") as f:
config = json.load(f)
# Initialize detectors
scorebug_detector = DetectScoreBug(template_path=str(template_path))
fixed_region = (config["scorebug_x"], config["scorebug_y"], config["scorebug_width"], config["scorebug_height"])
scorebug_detector.set_fixed_region(fixed_region)
clock_reader = PlayClockRegionExtractor(region_config_path=str(playclock_config_path))
# Extract and preprocess all frames
logger.info("\n--- Frame Extraction Phase ---")
frames_data, extract_timing = extract_frames_sequential(
str(VIDEO_PATH),
TESTING_START_TIME,
TESTING_END_TIME,
FRAME_INTERVAL,
scorebug_detector,
clock_reader,
)
logger.info("Extraction timing:")
for k, v in extract_timing.items():
logger.info(" %s: %.2fs", k, v)
logger.info(" Total: %.2fs", sum(extract_timing.values()))
if not frames_data:
logger.error("No frames extracted!")
return 1
# Run benchmarks
logger.info("\n--- OCR Benchmarks ---")
# Benchmark 1: Single-image OCR (baseline)
results_single, time_single = benchmark_single_ocr(frames_data)
det_single, tot_single, rate_single = compute_accuracy(results_single)
logger.info("\n[1] Single-image OCR:")
logger.info(" Time: %.2fs (%.1f ms/frame)", time_single, 1000 * time_single / len(frames_data))
logger.info(" Detection: %d/%d (%.1f%%)", det_single, tot_single, rate_single)
# Benchmark 2: Batch OCR (same reader, batched calls)
results_batch, time_batch = benchmark_batch_readtext(frames_data, batch_size=16)
det_batch, tot_batch, rate_batch = compute_accuracy(results_batch)
logger.info("\n[2] Batch OCR (batch_size=16):")
logger.info(" Time: %.2fs (%.1f ms/frame)", time_batch, 1000 * time_batch / len(frames_data))
logger.info(" Detection: %d/%d (%.1f%%)", det_batch, tot_batch, rate_batch)
logger.info(" Speedup vs single: %.2fx", time_single / time_batch if time_batch > 0 else 0)
# Benchmark 3: Threaded OCR (2 workers)
results_thread2, time_thread2 = benchmark_threaded_ocr(frames_data, num_workers=2)
det_thread2, tot_thread2, rate_thread2 = compute_accuracy(results_thread2)
logger.info("\n[3] Threaded OCR (2 workers):")
logger.info(" Time: %.2fs (%.1f ms/frame)", time_thread2, 1000 * time_thread2 / len(frames_data))
logger.info(" Detection: %d/%d (%.1f%%)", det_thread2, tot_thread2, rate_thread2)
logger.info(" Speedup vs single: %.2fx", time_single / time_thread2 if time_thread2 > 0 else 0)
# Benchmark 4: Threaded OCR (4 workers)
results_thread4, time_thread4 = benchmark_threaded_ocr(frames_data, num_workers=4)
det_thread4, tot_thread4, rate_thread4 = compute_accuracy(results_thread4)
logger.info("\n[4] Threaded OCR (4 workers):")
logger.info(" Time: %.2fs (%.1f ms/frame)", time_thread4, 1000 * time_thread4 / len(frames_data))
logger.info(" Detection: %d/%d (%.1f%%)", det_thread4, tot_thread4, rate_thread4)
logger.info(" Speedup vs single: %.2fx", time_single / time_thread4 if time_thread4 > 0 else 0)
# Summary
logger.info("\n" + "=" * 60)
logger.info("SUMMARY")
logger.info("=" * 60)
logger.info("Frames processed: %d", len(frames_data))
logger.info("\nOCR Method | Time (s) | ms/frame | Speedup | Det Rate")
logger.info("-" * 60)
logger.info("Single-image | %8.2f | %8.1f | %7.2fx | %6.1f%%", time_single, 1000 * time_single / len(frames_data), 1.0, rate_single)
logger.info(
"Batch (size=16) | %8.2f | %8.1f | %7.2fx | %6.1f%%", time_batch, 1000 * time_batch / len(frames_data), time_single / time_batch if time_batch > 0 else 0, rate_batch
)
logger.info(
"Threaded (2 wkrs) | %8.2f | %8.1f | %7.2fx | %6.1f%%",
time_thread2,
1000 * time_thread2 / len(frames_data),
time_single / time_thread2 if time_thread2 > 0 else 0,
rate_thread2,
)
logger.info(
"Threaded (4 wkrs) | %8.2f | %8.1f | %7.2fx | %6.1f%%",
time_thread4,
1000 * time_thread4 / len(frames_data),
time_single / time_thread4 if time_thread4 > 0 else 0,
rate_thread4,
)
logger.info("=" * 60)
# Recommendations
best_time = min(time_single, time_batch, time_thread2, time_thread4)
if best_time == time_single:
logger.info("\nRecommendation: Keep single-image OCR (no speedup from batching)")
elif best_time == time_batch:
logger.info("\nRecommendation: Use batch OCR with batch_size=16")
elif best_time == time_thread2:
logger.info("\nRecommendation: Use threaded OCR with 2 workers")
else:
logger.info("\nRecommendation: Use threaded OCR with 4 workers")
return 0
if __name__ == "__main__":
sys.exit(main())