Spaces:
Sleeping
Sleeping
| #!/usr/bin/env python3 | |
| """ | |
| Test script for PlayStateMachine using synthetic clock sequences. | |
| This script validates the state machine logic by feeding it simulated clock readings | |
| and verifying that plays are detected correctly. It also tests the backward-counting | |
| logic for determining play end times. | |
| Usage: | |
| python scripts/test_state_machine.py | |
| Tests: | |
| 1. Basic play detection (clock reset to 40) | |
| 2. Backward counting calculation | |
| 3. Scorebug loss and recovery | |
| 4. Clock freeze detection | |
| 5. Disagreement scenarios between direct detection and backward calc | |
| """ | |
| import logging | |
| import sys | |
| from dataclasses import dataclass | |
| from pathlib import Path | |
| from typing import List, Tuple, Optional | |
| from tracking import TrackPlayState, PlayEvent | |
| from detection import ScorebugDetection | |
| from readers import PlayClockReading | |
| logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s") | |
| logger = logging.getLogger(__name__) | |
| class TestFrame: | |
| """Simulated frame data for testing.""" | |
| timestamp: float # Video timestamp in seconds | |
| scorebug_detected: bool # Whether scorebug is visible | |
| clock_value: Optional[int] # Play clock value (None if unreadable) | |
| clock_confidence: float = 0.9 # OCR confidence | |
| def create_scorebug(detected: bool) -> ScorebugDetection: | |
| """Create a simulated ScorebugDetection.""" | |
| return ScorebugDetection(detected=detected, confidence=0.95 if detected else 0.0, bbox=(100, 50, 400, 100) if detected else None, method="test") | |
| def create_clock_reading(value: Optional[int], confidence: float = 0.9) -> PlayClockReading: | |
| """Create a simulated PlayClockReading.""" | |
| if value is None: | |
| return PlayClockReading(detected=False, value=None, confidence=0.0, raw_text="") | |
| return PlayClockReading(detected=True, value=value, confidence=confidence, raw_text=str(value)) | |
| def run_test_sequence(name: str, frames: List[TestFrame], expected_plays: int) -> Tuple[bool, List[PlayEvent]]: | |
| """ | |
| Run a test sequence through the state machine. | |
| Args: | |
| name: Test name for logging | |
| frames: List of TestFrame objects | |
| expected_plays: Expected number of plays to detect | |
| Returns: | |
| Tuple of (success, detected_plays) | |
| """ | |
| logger.info("=" * 60) | |
| logger.info("TEST: %s", name) | |
| logger.info("=" * 60) | |
| state_machine = TrackPlayState() | |
| detected_plays = [] | |
| for frame in frames: | |
| scorebug = create_scorebug(frame.scorebug_detected) | |
| clock = create_clock_reading(frame.clock_value, frame.clock_confidence) | |
| play_event = state_machine.update(frame.timestamp, scorebug, clock) | |
| if play_event: | |
| detected_plays.append(play_event) | |
| logger.info(" -> Play detected: #%d (%.1fs - %.1fs)", play_event.play_number, play_event.start_time, play_event.end_time) | |
| # Check results | |
| success = len(detected_plays) == expected_plays | |
| status = "PASS" if success else "FAIL" | |
| logger.info("-" * 60) | |
| logger.info("Result: %s (expected %d plays, got %d)", status, expected_plays, len(detected_plays)) | |
| if detected_plays: | |
| stats = state_machine.get_stats() | |
| logger.info("Stats: %s", stats) | |
| return success, detected_plays | |
| def test_basic_play_detection() -> bool: | |
| """Test basic play detection with clock reset to 40.""" | |
| # Simulate a simple play: | |
| # - Clock counts down from 25 to 20 | |
| # - Clock resets to 40 (snap) | |
| # - Clock counts down from 38 to 35 (next play setup) | |
| frames = [ | |
| TestFrame(0.0, True, 25), | |
| TestFrame(1.0, True, 24), | |
| TestFrame(2.0, True, 23), | |
| TestFrame(3.0, True, 22), | |
| TestFrame(4.0, True, 21), | |
| TestFrame(5.0, True, 20), | |
| # Play starts - clock resets to 40 | |
| TestFrame(6.0, True, 40), | |
| # Play in progress, clock starts counting for next play | |
| TestFrame(7.0, True, 39), | |
| TestFrame(8.0, True, 38), | |
| TestFrame(9.0, True, 37), | |
| TestFrame(10.0, True, 36), | |
| TestFrame(11.0, True, 35), | |
| ] | |
| success, plays = run_test_sequence("Basic Play Detection (clock reset)", frames, expected_plays=1) | |
| if plays: | |
| play = plays[0] | |
| # Verify start was detected via clock reset | |
| if play.start_method != "clock_reset": | |
| logger.error("Expected start_method='clock_reset', got '%s'", play.start_method) | |
| return False | |
| return success | |
| def test_backward_counting() -> bool: | |
| """Test backward counting for play end time calculation.""" | |
| # Simulate play with gap (replay): | |
| # - Clock at 20, then resets to 40 (snap) | |
| # - Scorebug lost (replay) | |
| # - Scorebug returns with clock at 35 | |
| # - Backward calc: end_time = 15.0 - (40 - 35) = 10.0 | |
| frames = [ | |
| TestFrame(0.0, True, 25), | |
| TestFrame(1.0, True, 24), | |
| TestFrame(2.0, True, 23), | |
| TestFrame(3.0, True, 22), | |
| TestFrame(4.0, True, 21), | |
| TestFrame(5.0, True, 20), | |
| # Play starts - clock resets to 40 | |
| TestFrame(6.0, True, 40), | |
| # Scorebug lost (broadcast showing replay) | |
| TestFrame(7.0, False, None), | |
| TestFrame(8.0, False, None), | |
| TestFrame(9.0, False, None), | |
| TestFrame(10.0, False, None), | |
| TestFrame(11.0, False, None), | |
| TestFrame(12.0, False, None), | |
| TestFrame(13.0, False, None), | |
| TestFrame(14.0, False, None), | |
| # Scorebug returns with clock at 35 | |
| # Backward calc: play_end = 15.0 - (40 - 35) = 10.0 | |
| TestFrame(15.0, True, 35), | |
| TestFrame(16.0, True, 34), | |
| TestFrame(17.0, True, 33), | |
| ] | |
| success, plays = run_test_sequence("Backward Counting (scorebug loss)", frames, expected_plays=1) | |
| if plays: | |
| play = plays[0] | |
| # Verify backward calculation was used | |
| if play.end_method != "backward_calc": | |
| logger.error("Expected end_method='backward_calc', got '%s'", play.end_method) | |
| return False | |
| # Verify end time calculation: 15.0 - (40 - 35) = 10.0 | |
| expected_end_time = 10.0 | |
| if abs(play.end_time - expected_end_time) > 0.1: | |
| logger.error("Expected end_time=%.1f, got %.1f", expected_end_time, play.end_time) | |
| return False | |
| logger.info("Backward counting verified: end_time=%.1f (expected %.1f)", play.end_time, expected_end_time) | |
| return success | |
| def test_multiple_plays() -> bool: | |
| """Test detection of multiple consecutive plays.""" | |
| # Simulate two plays | |
| frames = [ | |
| # Pre-snap for play 1 | |
| TestFrame(0.0, True, 25), | |
| TestFrame(1.0, True, 24), | |
| TestFrame(2.0, True, 23), | |
| # Play 1 starts | |
| TestFrame(3.0, True, 40), | |
| # Play 1 in progress, clock counting for next | |
| TestFrame(4.0, True, 39), | |
| TestFrame(5.0, True, 38), | |
| # Pre-snap for play 2 | |
| TestFrame(6.0, True, 25), | |
| TestFrame(7.0, True, 24), | |
| TestFrame(8.0, True, 23), | |
| # Play 2 starts | |
| TestFrame(9.0, True, 40), | |
| # Play 2 in progress | |
| TestFrame(10.0, True, 39), | |
| TestFrame(11.0, True, 38), | |
| TestFrame(12.0, True, 37), | |
| ] | |
| success, plays = run_test_sequence("Multiple Plays Detection", frames, expected_plays=2) | |
| if len(plays) == 2: | |
| logger.info("Play 1: %.1fs - %.1fs", plays[0].start_time, plays[0].end_time) | |
| logger.info("Play 2: %.1fs - %.1fs", plays[1].start_time, plays[1].end_time) | |
| return success | |
| def test_clock_behavior_validation() -> bool: | |
| """ | |
| Test to validate clock behavior at snap time. | |
| This test logs clock readings around snap time to help determine: | |
| - Does the clock reset to 40 when the ball is snapped? | |
| - Or does the clock freeze at its current value? | |
| Expected behavior: clock resets to 40 | |
| """ | |
| # Simulate detailed clock readings around snap | |
| frames = [ | |
| TestFrame(0.0, True, 15), | |
| TestFrame(0.5, True, 15), # Clock may hold | |
| TestFrame(1.0, True, 14), | |
| TestFrame(1.5, True, 14), | |
| TestFrame(2.0, True, 13), | |
| TestFrame(2.5, True, 13), | |
| TestFrame(3.0, True, 12), | |
| TestFrame(3.5, True, 12), | |
| TestFrame(4.0, True, 11), | |
| TestFrame(4.5, True, 11), | |
| TestFrame(5.0, True, 10), | |
| # Snap occurs - clock should reset to 40 | |
| TestFrame(5.5, True, 40), | |
| TestFrame(6.0, True, 40), | |
| TestFrame(6.5, True, 39), | |
| TestFrame(7.0, True, 39), | |
| TestFrame(7.5, True, 38), | |
| ] | |
| success, plays = run_test_sequence("Clock Behavior Validation", frames, expected_plays=1) | |
| if plays: | |
| play = plays[0] | |
| logger.info("Clock behavior: start_method=%s, start_clock=%s", play.start_method, play.start_clock_value) | |
| # This confirms whether reset-to-40 is the detected behavior | |
| if play.start_method == "clock_reset": | |
| logger.info("CONFIRMED: Clock resets to 40 at snap") | |
| elif play.start_method == "clock_freeze": | |
| logger.info("OBSERVED: Clock freezes at snap") | |
| return success | |
| def test_direct_vs_backward_comparison() -> bool: | |
| """Test that backward counting is preferred over direct detection.""" | |
| # Simulate a play where we see clock at 40 (direct) then ticking down | |
| # Both methods should give similar results | |
| frames = [ | |
| TestFrame(0.0, True, 20), | |
| TestFrame(1.0, True, 19), | |
| TestFrame(2.0, True, 18), | |
| # Play starts | |
| TestFrame(3.0, True, 40), | |
| # We see clock at 40 (direct detection possible) | |
| TestFrame(4.0, True, 40), | |
| # Clock starts ticking (backward calc should match) | |
| TestFrame(5.0, True, 39), | |
| TestFrame(6.0, True, 38), | |
| TestFrame(7.0, True, 37), | |
| ] | |
| success, plays = run_test_sequence("Direct vs Backward Comparison", frames, expected_plays=1) | |
| if plays: | |
| play = plays[0] | |
| logger.info("End method: %s", play.end_method) | |
| logger.info("End time: %.1f", play.end_time) | |
| if play.direct_end_time: | |
| logger.info("Direct end time: %.1f", play.direct_end_time) | |
| diff = abs(play.end_time - play.direct_end_time) | |
| logger.info("Difference: %.1f seconds", diff) | |
| return success | |
| def main(): | |
| """Run all state machine tests.""" | |
| logger.info("TrackPlayState Test Suite") | |
| logger.info("=" * 60) | |
| tests = [ | |
| ("Basic Play Detection", test_basic_play_detection), | |
| ("Backward Counting", test_backward_counting), | |
| ("Multiple Plays", test_multiple_plays), | |
| ("Clock Behavior Validation", test_clock_behavior_validation), | |
| ("Direct vs Backward Comparison", test_direct_vs_backward_comparison), | |
| ] | |
| results = [] | |
| for name, test_func in tests: | |
| try: | |
| passed = test_func() | |
| results.append((name, passed)) | |
| except Exception as e: # pylint: disable=broad-exception-caught | |
| logger.error("Test '%s' raised exception: %s", name, e) | |
| results.append((name, False)) | |
| # Summary | |
| logger.info("") | |
| logger.info("=" * 60) | |
| logger.info("TEST SUMMARY") | |
| logger.info("=" * 60) | |
| passed_count = 0 | |
| for name, passed in results: | |
| status = "PASS" if passed else "FAIL" | |
| logger.info(" %s: %s", name, status) | |
| if passed: | |
| passed_count += 1 | |
| total = len(results) | |
| logger.info("-" * 60) | |
| logger.info("Total: %d/%d passed", passed_count, total) | |
| return 0 if passed_count == total else 1 | |
| if __name__ == "__main__": | |
| sys.exit(main()) | |