cfb40 / scripts /investigate_missed_plays.py
andytaylor-smg's picture
moving stuff all around
6c65498
#!/usr/bin/env python3
"""
Investigate missed plays by analyzing clock readings around each one.
For each missed play, this script captures:
1. Clock readings in the 30s before play start (pre-play countdown)
2. Clock readings during the play (play duration)
3. Clock readings in the 30s after play end (post-play countdown)
This helps understand WHY plays were missed - was it:
- Missing clock readings (template matching failed)?
- State machine logic issues?
- Timing/alignment issues?
Usage:
cd /Users/andytaylor/Documents/Personal/cfb40
source .venv/bin/activate
python tests/test_digit_templates/investigate_missed_plays.py
"""
import json
import logging
from pathlib import Path
from typing import List, Dict, Tuple, Optional
import cv2
from setup import DigitTemplateLibrary
from readers import ReadPlayClock
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")
logger = logging.getLogger(__name__)
# Configuration
VIDEO_PATH = "full_videos/OSU vs Tenn 12.21.24.mkv"
DIGIT_TEMPLATE_PATH = "output/debug/digit_templates"
V3_BASELINE_PATH = "output/benchmarks/v3_special_plays_baseline.json"
FAST_EVAL_PATH = "output/benchmarks/fast_template_evaluation.json"
PLAYCLOCK_CONFIG_PATH = "output/OSU_vs_Tenn_12_21_24_playclock_config.json"
SCOREBUG_REGION = (128, 975, 1669, 46)
FRAME_INTERVAL = 0.5
def load_playclock_config() -> Tuple[int, int, int, int]:
"""Load play clock region offset from config."""
with open(PLAYCLOCK_CONFIG_PATH, "r", encoding="utf-8") as f:
config = json.load(f)
return (config["x_offset"], config["y_offset"], config["width"], config["height"])
def get_absolute_playclock_coords() -> Tuple[int, int, int, int]:
"""Calculate absolute play clock coordinates."""
sb_x, sb_y, _, _ = SCOREBUG_REGION
pc_x_off, pc_y_off, pc_w, pc_h = load_playclock_config()
return (sb_x + pc_x_off, sb_y + pc_y_off, pc_w, pc_h)
def load_v3_baseline() -> List[Dict]:
"""Load v3 baseline plays."""
with open(V3_BASELINE_PATH, "r", encoding="utf-8") as f:
return json.load(f).get("plays", [])
def load_detected_plays() -> List[Dict]:
"""Load detected plays from fast evaluation."""
with open(FAST_EVAL_PATH, "r", encoding="utf-8") as f:
return json.load(f).get("plays", [])
def find_missed_plays(baseline: List[Dict], detected: List[Dict], tolerance: float = 5.0) -> List[Dict]:
"""Find baseline plays that weren't detected."""
missed = []
for bp in baseline:
bp_start = bp.get("start_time", 0)
found = False
for dp in detected:
if abs(dp.get("start_time", 0) - bp_start) <= tolerance:
found = True
break
if not found:
missed.append(bp)
return missed
def collect_clock_readings(
video_path: str,
template_reader: ReadPlayClock,
playclock_coords: Tuple[int, int, int, int],
start_time: float,
end_time: float,
interval: float = 0.5,
) -> List[Tuple[float, Optional[int], float]]:
"""
Collect clock readings for a time range.
Returns list of (timestamp, clock_value or None, confidence)
"""
readings = []
cap = cv2.VideoCapture(video_path)
fps = cap.get(cv2.CAP_PROP_FPS)
current_time = start_time
while current_time <= end_time:
frame_num = int(current_time * fps)
cap.set(cv2.CAP_PROP_POS_FRAMES, frame_num)
ret, frame = cap.read()
if ret:
result = template_reader.read_from_fixed_location(frame, playclock_coords)
if result.detected and result.value is not None:
readings.append((current_time, result.value, result.confidence))
else:
readings.append((current_time, None, result.confidence))
else:
readings.append((current_time, None, 0.0))
current_time += interval
cap.release()
return readings
def format_readings(readings: List[Tuple[float, Optional[int], float]]) -> str:
"""Format readings as a compact string."""
parts = []
for t, val, conf in readings:
if val is not None:
parts.append(f"{val}")
else:
parts.append("X")
return " → ".join(parts)
def analyze_missed_play(
play: Dict,
template_reader: ReadPlayClock,
playclock_coords: Tuple[int, int, int, int],
video_path: str,
) -> Dict:
"""Analyze clock readings around a missed play."""
start_time = play.get("start_time", 0)
end_time = play.get("end_time", start_time + 10)
duration = end_time - start_time
# Collect readings: 30s before, during play, 30s after
pre_start = max(0, start_time - 30)
post_end = end_time + 30
pre_readings = collect_clock_readings(video_path, template_reader, playclock_coords, pre_start, start_time - 0.5)
during_readings = collect_clock_readings(video_path, template_reader, playclock_coords, start_time, end_time)
post_readings = collect_clock_readings(video_path, template_reader, playclock_coords, end_time + 0.5, post_end)
# Count successful readings
pre_success = sum(1 for _, v, _ in pre_readings if v is not None)
during_success = sum(1 for _, v, _ in during_readings if v is not None)
post_success = sum(1 for _, v, _ in post_readings if v is not None)
return {
"start_time": start_time,
"end_time": end_time,
"duration": duration,
"pre_readings": pre_readings,
"during_readings": during_readings,
"post_readings": post_readings,
"pre_success_rate": pre_success / len(pre_readings) if pre_readings else 0,
"during_success_rate": during_success / len(during_readings) if during_readings else 0,
"post_success_rate": post_success / len(post_readings) if post_readings else 0,
"pre_formatted": format_readings(pre_readings),
"during_formatted": format_readings(during_readings),
"post_formatted": format_readings(post_readings),
}
def generate_markdown_report(analyses: List[Dict], output_path: str):
"""Generate a markdown report of the investigation."""
lines = [
"# Missed Plays Investigation Report",
"",
"## Summary",
"",
f"This report analyzes the first {len(analyses)} missed plays to understand why template matching",
"failed to detect them. For each play, we examine:",
"",
"1. **Pre-play countdown** (30s before play start) - should show clock ticking down to reset point",
"2. **During play** - should show clock at 40 (or 25) then counting down",
"3. **Post-play countdown** (30s after play end) - should show clock continuing to count down",
"",
"**Legend:** Numbers = detected clock values, X = failed to detect",
"",
"---",
"",
]
# Summary table
lines.extend(
[
"## Detection Success Rates",
"",
"| Play # | Time | Duration | Pre-Play | During | Post-Play |",
"|--------|------|----------|----------|--------|-----------|",
]
)
for i, a in enumerate(analyses):
t = a["start_time"]
time_str = f"{int(t//60)}:{t%60:05.2f}"
lines.append(
f"| {i+1} | {time_str} | {a['duration']:.1f}s | "
f"{a['pre_success_rate']*100:.0f}% | "
f"{a['during_success_rate']*100:.0f}% | "
f"{a['post_success_rate']*100:.0f}% |"
)
lines.extend(["", "---", ""])
# Detailed analysis for each play
for i, a in enumerate(analyses):
t = a["start_time"]
time_str = f"{int(t//60)}:{t%60:05.2f}"
lines.extend(
[
f"## Play {i+1}: {time_str} ({t:.1f}s)",
"",
f"**Baseline:** start={a['start_time']:.1f}s, end={a['end_time']:.1f}s, duration={a['duration']:.1f}s",
"",
f"### Pre-Play Countdown (30s before start)",
f"Success rate: {a['pre_success_rate']*100:.0f}% ({sum(1 for _, v, _ in a['pre_readings'] if v is not None)}/{len(a['pre_readings'])} readings)",
"",
"```",
a["pre_formatted"],
"```",
"",
f"### During Play",
f"Success rate: {a['during_success_rate']*100:.0f}% ({sum(1 for _, v, _ in a['during_readings'] if v is not None)}/{len(a['during_readings'])} readings)",
"",
"```",
a["during_formatted"],
"```",
"",
f"### Post-Play Countdown (30s after end)",
f"Success rate: {a['post_success_rate']*100:.0f}% ({sum(1 for _, v, _ in a['post_readings'] if v is not None)}/{len(a['post_readings'])} readings)",
"",
"```",
a["post_formatted"],
"```",
"",
]
)
# Analysis notes
lines.append("### Analysis Notes")
# Check for patterns
pre_vals = [v for _, v, _ in a["pre_readings"] if v is not None]
during_vals = [v for _, v, _ in a["during_readings"] if v is not None]
post_vals = [v for _, v, _ in a["post_readings"] if v is not None]
if a["pre_success_rate"] < 0.5:
lines.append("- ⚠️ Low pre-play detection rate - template matching may be failing")
if a["during_success_rate"] < 0.5:
lines.append("- ⚠️ Low during-play detection rate - likely the cause of missed play")
if a["post_success_rate"] < 0.5:
lines.append("- ⚠️ Low post-play detection rate")
# Check for clock reset pattern
if during_vals:
if 40 in during_vals or 25 in during_vals:
lines.append("- ✓ Clock reset (40 or 25) detected during play period")
else:
lines.append("- ❌ No clock reset (40 or 25) detected during play period")
# Check for single-digit values (which we have fewer templates for)
all_vals = pre_vals + during_vals + post_vals
single_digits = [v for v in all_vals if v is not None and v < 10]
if single_digits:
lines.append(f"- ℹ️ Single-digit values detected: {set(single_digits)}")
lines.extend(["", "---", ""])
# Write report
with open(output_path, "w", encoding="utf-8") as f:
f.write("\n".join(lines))
logger.info("Report saved to: %s", output_path)
def main():
logger.info("=" * 70)
logger.info("INVESTIGATING MISSED PLAYS")
logger.info("=" * 70)
# Load data
logger.info("\n[Step 1] Loading baseline and detected plays...")
baseline = load_v3_baseline()
detected = load_detected_plays()
logger.info("Baseline plays: %d", len(baseline))
logger.info("Detected plays: %d", len(detected))
# Find missed plays
missed = find_missed_plays(baseline, detected)
logger.info("Missed plays: %d", len(missed))
if not missed:
logger.info("No missed plays to investigate!")
return
# Load template reader
logger.info("\n[Step 2] Loading template reader...")
template_library = DigitTemplateLibrary()
template_library.load(DIGIT_TEMPLATE_PATH)
playclock_coords = get_absolute_playclock_coords()
pc_w, pc_h = playclock_coords[2], playclock_coords[3]
template_reader = ReadPlayClock(template_library, region_width=pc_w, region_height=pc_h)
# Analyze first 10 missed plays
logger.info("\n[Step 3] Analyzing first 10 missed plays...")
analyses = []
for i, play in enumerate(missed[:10]):
t = play.get("start_time", 0)
logger.info(" Analyzing play %d at %d:%05.2f...", i + 1, int(t // 60), t % 60)
analysis = analyze_missed_play(play, template_reader, playclock_coords, VIDEO_PATH)
analyses.append(analysis)
# Generate report
logger.info("\n[Step 4] Generating markdown report...")
output_path = "docs/missed_plays_investigation.md"
Path(output_path).parent.mkdir(parents=True, exist_ok=True)
generate_markdown_report(analyses, output_path)
# Print summary
logger.info("\n" + "=" * 70)
logger.info("SUMMARY")
logger.info("=" * 70)
avg_pre = sum(a["pre_success_rate"] for a in analyses) / len(analyses)
avg_during = sum(a["during_success_rate"] for a in analyses) / len(analyses)
avg_post = sum(a["post_success_rate"] for a in analyses) / len(analyses)
logger.info("Average detection rates across %d missed plays:", len(analyses))
logger.info(" Pre-play: %.1f%%", avg_pre * 100)
logger.info(" During: %.1f%%", avg_during * 100)
logger.info(" Post-play: %.1f%%", avg_post * 100)
logger.info("\nReport saved to: %s", output_path)
if __name__ == "__main__":
main()