Spaces:
Runtime error
Runtime error
| import cv2 | |
| import os | |
| import math | |
| import numpy as np | |
| import tempfile | |
| import matplotlib.pyplot as plt | |
| from ultralytics import YOLO | |
| from pydub import AudioSegment | |
| import ffmpeg | |
| from scipy.optimize import curve_fit | |
| model = YOLO("best.pt") | |
| FPS = 30 | |
| def estimate_speed(p1, p2, fps): | |
| dist = math.hypot(p2[0] - p1[0], p2[1] - p1[1]) | |
| meters_per_pixel = 0.03 | |
| mps = dist * meters_per_pixel * fps | |
| return mps * 3.6 | |
| def add_voice_to_video(video_path, verdict): | |
| audio_file = "out.mp3" if verdict == "OUT" else "not_out.mp3" | |
| audio_temp = tempfile.NamedTemporaryFile(suffix=".mp3", delete=False).name | |
| AudioSegment.from_file(audio_file).export(audio_temp, format="mp3") | |
| final_output = tempfile.NamedTemporaryFile(suffix=".mp4", delete=False).name | |
| ( | |
| ffmpeg | |
| .input(video_path) | |
| .output(final_output, audio=audio_temp, vcodec='copy', acodec='aac', strict='experimental') | |
| .run(overwrite_output=True) | |
| ) | |
| return final_output | |
| def extend_trajectory_with_rotation(points, bounce_idx, final_x=20.12): | |
| x_vals = [pt[0] for pt in points] | |
| y_vals = [pt[1] for pt in points] | |
| if bounce_idx is None or bounce_idx >= len(points) - 2: | |
| return x_vals, y_vals | |
| x_pre = x_vals[:bounce_idx] | |
| y_pre = y_vals[:bounce_idx] | |
| def poly2(x, a, b, c): return a*x**2 + b*x + c | |
| try: | |
| popt, _ = curve_fit(poly2, x_pre, y_pre) | |
| x_post = np.linspace(x_vals[bounce_idx], final_x, 50) | |
| curve_shift = np.linspace(0, 0.05, len(x_post)) | |
| y_post = poly2(x_post, *popt) + curve_shift | |
| return x_vals + list(x_post), y_vals + list(y_post) | |
| except: | |
| return x_vals, y_vals | |
| def draw_top_down_trajectory(points, bounce_frame_idx, output_path): | |
| if len(points) < 4: | |
| return None | |
| x_vals = [pt[0] for pt in points] | |
| y_vals = [pt[1] for pt in points] | |
| x_ext, y_ext = extend_trajectory_with_rotation(points, bounce_frame_idx) | |
| plt.figure(figsize=(10, 3)) | |
| plt.plot(x_ext, y_ext, 'r-', label='Predicted Trajectory') | |
| plt.scatter(x_vals, y_vals, c='blue', s=10, label='Detected Points') | |
| plt.axvline(x=17.68, color='gray', linestyle='--', label='Stumps (17.68m)') | |
| plt.title("Top-Down Predicted Ball Path") | |
| plt.xlabel("Pitch Length (m)") | |
| plt.ylabel("Lateral Movement (m)") | |
| plt.grid(True) | |
| plt.legend() | |
| image_path = output_path.replace(".mp4", "_trajectory.png") | |
| plt.savefig(image_path) | |
| plt.close() | |
| return image_path | |
| def process_video(video_path): | |
| cap = cv2.VideoCapture(video_path) | |
| if not cap.isOpened(): | |
| raise Exception("Video open failed") | |
| width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) | |
| height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) | |
| fps = cap.get(cv2.CAP_PROP_FPS) or FPS | |
| fourcc = cv2.VideoWriter_fourcc(*'mp4v') | |
| temp_out = tempfile.NamedTemporaryFile(suffix=".mp4", delete=False).name | |
| out = cv2.VideoWriter(temp_out, fourcc, fps, (width, height)) | |
| max_frames = int(fps * 4) | |
| frame_count = 0 | |
| src = np.array([ | |
| [width * 0.3, height * 0.4], | |
| [width * 0.7, height * 0.4], | |
| [width * 0.7, height * 0.9], | |
| [width * 0.3, height * 0.9], | |
| ], dtype=np.float32) | |
| dst = np.array([ | |
| [0, 0], | |
| [20.12, 0], | |
| [20.12, 3.05], | |
| [0, 3.05] | |
| ], dtype=np.float32) | |
| H, _ = cv2.findHomography(src, dst) | |
| def project_point(px, py): | |
| pt = np.array([[[px, py]]], dtype=np.float32) | |
| return cv2.perspectiveTransform(pt, H)[0][0] | |
| trajectory, real_trajectory = [], [] | |
| bounce_detected = False | |
| bounce_frame_idx = None | |
| prev_center = None | |
| verdict = "NOT OUT" | |
| while frame_count < max_frames: | |
| ret, frame = cap.read() | |
| if not ret: | |
| break | |
| results = model(frame) | |
| for box in results[0].boxes: | |
| if int(box.cls[0]) == 0: | |
| x1, y1, x2, y2 = box.xyxy[0].cpu().numpy().astype(int) | |
| cx, cy = int((x1 + x2) / 2), int((y1 + y2) / 2) | |
| trajectory.append((cx, cy)) | |
| x_m, y_m = project_point(cx, cy) | |
| real_trajectory.append((x_m, y_m)) | |
| cv2.circle(frame, (cx, cy), 8, (0, 0, 255), -1) | |
| if prev_center and not bounce_detected and cy - prev_center[1] > 15: | |
| bounce_detected = True | |
| bounce_frame_idx = frame_count | |
| cv2.putText(frame, "Bounce!", (cx, cy - 20), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0,255,255), 2) | |
| prev_center = (cx, cy) | |
| break | |
| for i in range(1, len(trajectory)): | |
| x1, y1 = trajectory[i - 1] | |
| x2, y2 = trajectory[i] | |
| cv2.line(frame, (x1, y1), (x2, y2), (255, 0, 0), 2) | |
| if len(real_trajectory) >= 2: | |
| x1, y1 = real_trajectory[-2] | |
| x2, y2 = real_trajectory[-1] | |
| speed = estimate_speed((x1, y1), (x2, y2), fps) | |
| cv2.putText(frame, f"Speed: {int(speed)} km/h", (30, 30), cv2.FONT_HERSHEY_SIMPLEX, 1, (255,255,255), 2) | |
| if bounce_detected and real_trajectory: | |
| x_latest, y_latest = real_trajectory[-1] | |
| if 1.0 < y_latest < 2.1: | |
| verdict = "OUT" | |
| cv2.putText(frame, f"Decision: {verdict}", (30, height - 30), cv2.FONT_HERSHEY_SIMPLEX, 1.2, | |
| (0,255,0) if verdict == "NOT OUT" else (0,0,255), 3) | |
| out.write(frame) | |
| frame_count += 1 | |
| cap.release() | |
| out.release() | |
| topdown_image = draw_top_down_trajectory(real_trajectory, bounce_frame_idx, temp_out) | |
| try: | |
| final_video = add_voice_to_video(temp_out, verdict) | |
| except: | |
| final_video = temp_out | |
| return final_video, topdown_image | |