Spaces:
Sleeping
Sleeping
| """ | |
| Test accurate frame extraction using ffmpeg. | |
| This script demonstrates that ffmpeg correctly handles VFR video | |
| timestamps while OpenCV does not. | |
| Usage: | |
| python scripts/test_ffmpeg_frame_extraction.py | |
| """ | |
| import json | |
| import logging | |
| import os | |
| import subprocess | |
| import sys | |
| import tempfile | |
| from pathlib import Path | |
| from typing import Any, Dict, List, Optional, Tuple | |
| import cv2 | |
| import numpy as np | |
| # Add src to path | |
| sys.path.insert(0, str(Path(__file__).parent.parent / "src")) | |
| from readers import ReadPlayClock | |
| from setup import DigitTemplateLibrary | |
| from detection.scorebug import DetectScoreBug | |
| logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s") | |
| logger = logging.getLogger(__name__) | |
| def load_texas_config() -> Dict[str, Any]: | |
| """Load the saved config for Texas video.""" | |
| config_path = Path("output/OSU_vs_Texas_01_10_25_config.json") | |
| with open(config_path, "r") as f: | |
| return json.load(f) | |
| def extract_frame_ffmpeg(video_path: str, timestamp: float) -> Optional[np.ndarray]: | |
| """ | |
| Extract a single frame using ffmpeg for accurate VFR handling. | |
| Args: | |
| video_path: Path to video file | |
| timestamp: Time in seconds | |
| Returns: | |
| Frame as numpy array (BGR), or None on failure | |
| """ | |
| with tempfile.NamedTemporaryFile(suffix=".png", delete=False) as tmp: | |
| tmp_path = tmp.name | |
| try: | |
| # Use ffmpeg with accurate seeking (-ss before -i for fast seek) | |
| cmd = [ | |
| "ffmpeg", | |
| "-ss", | |
| str(timestamp), | |
| "-i", | |
| str(video_path), | |
| "-frames:v", | |
| "1", | |
| "-q:v", | |
| "2", | |
| "-loglevel", | |
| "error", | |
| tmp_path, | |
| "-y", | |
| ] | |
| result = subprocess.run(cmd, capture_output=True, timeout=30) | |
| if result.returncode != 0: | |
| logger.error("ffmpeg failed: %s", result.stderr.decode()) | |
| return None | |
| frame = cv2.imread(tmp_path) | |
| return frame | |
| finally: | |
| if os.path.exists(tmp_path): | |
| os.remove(tmp_path) | |
| def extract_frame_opencv(video_path: str, timestamp: float) -> Tuple[Optional[np.ndarray], float]: | |
| """ | |
| Extract a frame using OpenCV (for comparison). | |
| Returns tuple of (frame, actual_timestamp). | |
| """ | |
| cap = cv2.VideoCapture(video_path) | |
| if not cap.isOpened(): | |
| return None, -1.0 | |
| fps = cap.get(cv2.CAP_PROP_FPS) | |
| frame_num = int(timestamp * fps) | |
| cap.set(cv2.CAP_PROP_POS_FRAMES, frame_num) | |
| ret, frame = cap.read() | |
| actual_ts = cap.get(cv2.CAP_PROP_POS_MSEC) / 1000.0 | |
| cap.release() | |
| return (frame, actual_ts) if ret else (None, -1.0) | |
| def extract_game_clock_region(frame: np.ndarray, config: Dict[str, Any]) -> np.ndarray: | |
| """Extract the game clock region (not play clock) for verification.""" | |
| # Game clock is typically in the center of the scorebug | |
| # For Texas video, the game clock shows MM:SS format | |
| sb_x = config["scorebug_x"] | |
| sb_y = config["scorebug_y"] | |
| # Game clock is roughly at center of scorebug, adjusted based on visual inspection | |
| # This is around x=560-620, y=663-680 in absolute coordinates | |
| gc_x = sb_x + 560 | |
| gc_y = sb_y + 16 | |
| gc_w = 60 | |
| gc_h = 20 | |
| return frame[gc_y : gc_y + gc_h, gc_x : gc_x + gc_w].copy() | |
| def main(): | |
| """Test ffmpeg vs OpenCV frame extraction.""" | |
| config = load_texas_config() | |
| video_path = config["video_path"] | |
| logger.info("=" * 80) | |
| logger.info("FFMPEG VS OPENCV FRAME EXTRACTION TEST") | |
| logger.info("=" * 80) | |
| logger.info("Video: %s", video_path) | |
| logger.info("") | |
| # Initialize play clock reader for verification | |
| template_dir = "output/debug/digit_templates" | |
| library = DigitTemplateLibrary() | |
| library.load(template_dir) | |
| pc_w, pc_h = config["playclock_width"], config["playclock_height"] | |
| reader = ReadPlayClock(library, region_width=pc_w, region_height=pc_h) | |
| playclock_coords = ( | |
| config["scorebug_x"] + config["playclock_x_offset"], | |
| config["scorebug_y"] + config["playclock_y_offset"], | |
| config["playclock_width"], | |
| config["playclock_height"], | |
| ) | |
| # Test timestamps in the problem area | |
| test_timestamps = [5928.4, 5929.4, 5930.4, 5931.4, 5932.4, 5933.4, 5934.4] | |
| output_dir = Path("output/debug/ffmpeg_extraction_test") | |
| output_dir.mkdir(parents=True, exist_ok=True) | |
| logger.info("Testing frame extraction at timestamps:") | |
| logger.info("-" * 40) | |
| results = [] | |
| for ts in test_timestamps: | |
| # Extract with ffmpeg | |
| ffmpeg_frame = extract_frame_ffmpeg(video_path, ts) | |
| # Extract with OpenCV | |
| opencv_frame, opencv_actual_ts = extract_frame_opencv(video_path, ts) | |
| if ffmpeg_frame is None or opencv_frame is None: | |
| logger.warning(" %.1fs: Extraction failed", ts) | |
| continue | |
| # Read play clock from both | |
| ffmpeg_clock = reader.read_from_fixed_location(ffmpeg_frame, playclock_coords, padding=10) | |
| opencv_clock = reader.read_from_fixed_location(opencv_frame, playclock_coords, padding=10) | |
| # Compare | |
| opencv_error = opencv_actual_ts - ts | |
| result = { | |
| "target_ts": ts, | |
| "opencv_actual_ts": opencv_actual_ts, | |
| "opencv_error": opencv_error, | |
| "ffmpeg_clock": ffmpeg_clock.value if ffmpeg_clock.detected else None, | |
| "opencv_clock": opencv_clock.value if opencv_clock.detected else None, | |
| } | |
| results.append(result) | |
| logger.info( | |
| " t=%.1fs: FFmpeg clock=%s, OpenCV clock=%s (actual=%.1fs, err=%+.1fs)", | |
| ts, | |
| ffmpeg_clock.value if ffmpeg_clock.detected else "N/A", | |
| opencv_clock.value if opencv_clock.detected else "N/A", | |
| opencv_actual_ts, | |
| opencv_error, | |
| ) | |
| # Save frames for visual comparison | |
| cv2.imwrite(str(output_dir / f"ffmpeg_t{ts:.1f}_clock{ffmpeg_clock.value or 'X'}.png"), ffmpeg_frame) | |
| cv2.imwrite(str(output_dir / f"opencv_t{ts:.1f}_actual{opencv_actual_ts:.1f}_clock{opencv_clock.value or 'X'}.png"), opencv_frame) | |
| logger.info("") | |
| logger.info("ANALYSIS") | |
| logger.info("-" * 40) | |
| # Check if ffmpeg frames show proper clock progression | |
| ffmpeg_clocks = [r["ffmpeg_clock"] for r in results if r["ffmpeg_clock"] is not None] | |
| opencv_clocks = [r["opencv_clock"] for r in results if r["opencv_clock"] is not None] | |
| logger.info("FFmpeg play clock sequence: %s", ffmpeg_clocks) | |
| logger.info("OpenCV play clock sequence: %s", opencv_clocks) | |
| # Check for proper countdown (values should generally decrease or reset at 40) | |
| def check_monotonic_countdown(clocks): | |
| """Check if clock values form a reasonable countdown pattern.""" | |
| if len(clocks) < 2: | |
| return True | |
| violations = 0 | |
| for i in range(1, len(clocks)): | |
| # Allow for resets (when clock goes from low value back to 40) | |
| if clocks[i] > clocks[i - 1] and not (clocks[i - 1] < 10 and clocks[i] >= 35): | |
| violations += 1 | |
| return violations | |
| ffmpeg_violations = check_monotonic_countdown(ffmpeg_clocks) | |
| opencv_violations = check_monotonic_countdown(opencv_clocks) | |
| logger.info("") | |
| logger.info("FFmpeg countdown violations: %d", ffmpeg_violations) | |
| logger.info("OpenCV countdown violations: %d", opencv_violations) | |
| if ffmpeg_violations < opencv_violations: | |
| logger.info("") | |
| logger.info("CONCLUSION: FFmpeg extraction produces correct chronological order!") | |
| logger.info("RECOMMENDATION: Use ffmpeg for frame extraction instead of OpenCV seeking") | |
| elif ffmpeg_violations == opencv_violations == 0: | |
| logger.info("") | |
| logger.info("CONCLUSION: Both methods appear correct in this sample") | |
| logger.info("(May need more samples to see the difference)") | |
| else: | |
| logger.info("") | |
| logger.info("CONCLUSION: Both methods show issues - may need re-encoding") | |
| logger.info("") | |
| logger.info("Debug frames saved to: %s", output_dir) | |
| if __name__ == "__main__": | |
| main() | |