import cv2 import os import math import numpy as np import tempfile import matplotlib.pyplot as plt from ultralytics import YOLO from pydub import AudioSegment import ffmpeg from scipy.optimize import curve_fit model = YOLO("best.pt") FPS = 30 def estimate_speed(p1, p2, fps): dist = math.hypot(p2[0] - p1[0], p2[1] - p1[1]) meters_per_pixel = 0.03 mps = dist * meters_per_pixel * fps return mps * 3.6 def add_voice_to_video(video_path, verdict): audio_file = "out.mp3" if verdict == "OUT" else "not_out.mp3" audio_temp = tempfile.NamedTemporaryFile(suffix=".mp3", delete=False).name AudioSegment.from_file(audio_file).export(audio_temp, format="mp3") final_output = tempfile.NamedTemporaryFile(suffix=".mp4", delete=False).name ( ffmpeg .input(video_path) .output(final_output, audio=audio_temp, vcodec='copy', acodec='aac', strict='experimental') .run(overwrite_output=True) ) return final_output def extend_trajectory_with_rotation(points, bounce_idx, final_x=20.12): x_vals = [pt[0] for pt in points] y_vals = [pt[1] for pt in points] if bounce_idx is None or bounce_idx >= len(points) - 2: return x_vals, y_vals x_pre = x_vals[:bounce_idx] y_pre = y_vals[:bounce_idx] def poly2(x, a, b, c): return a*x**2 + b*x + c try: popt, _ = curve_fit(poly2, x_pre, y_pre) x_post = np.linspace(x_vals[bounce_idx], final_x, 50) curve_shift = np.linspace(0, 0.05, len(x_post)) y_post = poly2(x_post, *popt) + curve_shift return x_vals + list(x_post), y_vals + list(y_post) except: return x_vals, y_vals def draw_top_down_trajectory(points, bounce_frame_idx, output_path): if len(points) < 4: return None x_vals = [pt[0] for pt in points] y_vals = [pt[1] for pt in points] x_ext, y_ext = extend_trajectory_with_rotation(points, bounce_frame_idx) plt.figure(figsize=(10, 3)) plt.plot(x_ext, y_ext, 'r-', label='Predicted Trajectory') plt.scatter(x_vals, y_vals, c='blue', s=10, label='Detected Points') plt.axvline(x=17.68, color='gray', linestyle='--', label='Stumps (17.68m)') plt.title("Top-Down Predicted Ball Path") plt.xlabel("Pitch Length (m)") plt.ylabel("Lateral Movement (m)") plt.grid(True) plt.legend() image_path = output_path.replace(".mp4", "_trajectory.png") plt.savefig(image_path) plt.close() return image_path def process_video(video_path): cap = cv2.VideoCapture(video_path) if not cap.isOpened(): raise Exception("Video open failed") width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) fps = cap.get(cv2.CAP_PROP_FPS) or FPS fourcc = cv2.VideoWriter_fourcc(*'mp4v') temp_out = tempfile.NamedTemporaryFile(suffix=".mp4", delete=False).name out = cv2.VideoWriter(temp_out, fourcc, fps, (width, height)) max_frames = int(fps * 4) frame_count = 0 src = np.array([ [width * 0.3, height * 0.4], [width * 0.7, height * 0.4], [width * 0.7, height * 0.9], [width * 0.3, height * 0.9], ], dtype=np.float32) dst = np.array([ [0, 0], [20.12, 0], [20.12, 3.05], [0, 3.05] ], dtype=np.float32) H, _ = cv2.findHomography(src, dst) def project_point(px, py): pt = np.array([[[px, py]]], dtype=np.float32) return cv2.perspectiveTransform(pt, H)[0][0] trajectory, real_trajectory = [], [] bounce_detected = False bounce_frame_idx = None prev_center = None verdict = "NOT OUT" while frame_count < max_frames: ret, frame = cap.read() if not ret: break results = model(frame) for box in results[0].boxes: if int(box.cls[0]) == 0: x1, y1, x2, y2 = box.xyxy[0].cpu().numpy().astype(int) cx, cy = int((x1 + x2) / 2), int((y1 + y2) / 2) trajectory.append((cx, cy)) x_m, y_m = project_point(cx, cy) real_trajectory.append((x_m, y_m)) cv2.circle(frame, (cx, cy), 8, (0, 0, 255), -1) if prev_center and not bounce_detected and cy - prev_center[1] > 15: bounce_detected = True bounce_frame_idx = frame_count cv2.putText(frame, "Bounce!", (cx, cy - 20), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0,255,255), 2) prev_center = (cx, cy) break for i in range(1, len(trajectory)): x1, y1 = trajectory[i - 1] x2, y2 = trajectory[i] cv2.line(frame, (x1, y1), (x2, y2), (255, 0, 0), 2) if len(real_trajectory) >= 2: x1, y1 = real_trajectory[-2] x2, y2 = real_trajectory[-1] speed = estimate_speed((x1, y1), (x2, y2), fps) cv2.putText(frame, f"Speed: {int(speed)} km/h", (30, 30), cv2.FONT_HERSHEY_SIMPLEX, 1, (255,255,255), 2) if bounce_detected and real_trajectory: x_latest, y_latest = real_trajectory[-1] if 1.0 < y_latest < 2.1: verdict = "OUT" cv2.putText(frame, f"Decision: {verdict}", (30, height - 30), cv2.FONT_HERSHEY_SIMPLEX, 1.2, (0,255,0) if verdict == "NOT OUT" else (0,0,255), 3) out.write(frame) frame_count += 1 cap.release() out.release() topdown_image = draw_top_down_trajectory(real_trajectory, bounce_frame_idx, temp_out) try: final_video = add_voice_to_video(temp_out, verdict) except: final_video = temp_out return final_video, topdown_image