GullyDRS.LBW.Predictorapp / drs_engine.py
viswanani's picture
Update drs_engine.py
c713aa5 verified
import cv2
import os
import math
import numpy as np
import tempfile
import matplotlib.pyplot as plt
from ultralytics import YOLO
from pydub import AudioSegment
import ffmpeg
from scipy.optimize import curve_fit
model = YOLO("best.pt")
FPS = 30
def estimate_speed(p1, p2, fps):
dist = math.hypot(p2[0] - p1[0], p2[1] - p1[1])
meters_per_pixel = 0.03
mps = dist * meters_per_pixel * fps
return mps * 3.6
def add_voice_to_video(video_path, verdict):
audio_file = "out.mp3" if verdict == "OUT" else "not_out.mp3"
audio_temp = tempfile.NamedTemporaryFile(suffix=".mp3", delete=False).name
AudioSegment.from_file(audio_file).export(audio_temp, format="mp3")
final_output = tempfile.NamedTemporaryFile(suffix=".mp4", delete=False).name
(
ffmpeg
.input(video_path)
.output(final_output, audio=audio_temp, vcodec='copy', acodec='aac', strict='experimental')
.run(overwrite_output=True)
)
return final_output
def extend_trajectory_with_rotation(points, bounce_idx, final_x=20.12):
x_vals = [pt[0] for pt in points]
y_vals = [pt[1] for pt in points]
if bounce_idx is None or bounce_idx >= len(points) - 2:
return x_vals, y_vals
x_pre = x_vals[:bounce_idx]
y_pre = y_vals[:bounce_idx]
def poly2(x, a, b, c): return a*x**2 + b*x + c
try:
popt, _ = curve_fit(poly2, x_pre, y_pre)
x_post = np.linspace(x_vals[bounce_idx], final_x, 50)
curve_shift = np.linspace(0, 0.05, len(x_post))
y_post = poly2(x_post, *popt) + curve_shift
return x_vals + list(x_post), y_vals + list(y_post)
except:
return x_vals, y_vals
def draw_top_down_trajectory(points, bounce_frame_idx, output_path):
if len(points) < 4:
return None
x_vals = [pt[0] for pt in points]
y_vals = [pt[1] for pt in points]
x_ext, y_ext = extend_trajectory_with_rotation(points, bounce_frame_idx)
plt.figure(figsize=(10, 3))
plt.plot(x_ext, y_ext, 'r-', label='Predicted Trajectory')
plt.scatter(x_vals, y_vals, c='blue', s=10, label='Detected Points')
plt.axvline(x=17.68, color='gray', linestyle='--', label='Stumps (17.68m)')
plt.title("Top-Down Predicted Ball Path")
plt.xlabel("Pitch Length (m)")
plt.ylabel("Lateral Movement (m)")
plt.grid(True)
plt.legend()
image_path = output_path.replace(".mp4", "_trajectory.png")
plt.savefig(image_path)
plt.close()
return image_path
def process_video(video_path):
cap = cv2.VideoCapture(video_path)
if not cap.isOpened():
raise Exception("Video open failed")
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
fps = cap.get(cv2.CAP_PROP_FPS) or FPS
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
temp_out = tempfile.NamedTemporaryFile(suffix=".mp4", delete=False).name
out = cv2.VideoWriter(temp_out, fourcc, fps, (width, height))
max_frames = int(fps * 4)
frame_count = 0
src = np.array([
[width * 0.3, height * 0.4],
[width * 0.7, height * 0.4],
[width * 0.7, height * 0.9],
[width * 0.3, height * 0.9],
], dtype=np.float32)
dst = np.array([
[0, 0],
[20.12, 0],
[20.12, 3.05],
[0, 3.05]
], dtype=np.float32)
H, _ = cv2.findHomography(src, dst)
def project_point(px, py):
pt = np.array([[[px, py]]], dtype=np.float32)
return cv2.perspectiveTransform(pt, H)[0][0]
trajectory, real_trajectory = [], []
bounce_detected = False
bounce_frame_idx = None
prev_center = None
verdict = "NOT OUT"
while frame_count < max_frames:
ret, frame = cap.read()
if not ret:
break
results = model(frame)
for box in results[0].boxes:
if int(box.cls[0]) == 0:
x1, y1, x2, y2 = box.xyxy[0].cpu().numpy().astype(int)
cx, cy = int((x1 + x2) / 2), int((y1 + y2) / 2)
trajectory.append((cx, cy))
x_m, y_m = project_point(cx, cy)
real_trajectory.append((x_m, y_m))
cv2.circle(frame, (cx, cy), 8, (0, 0, 255), -1)
if prev_center and not bounce_detected and cy - prev_center[1] > 15:
bounce_detected = True
bounce_frame_idx = frame_count
cv2.putText(frame, "Bounce!", (cx, cy - 20), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0,255,255), 2)
prev_center = (cx, cy)
break
for i in range(1, len(trajectory)):
x1, y1 = trajectory[i - 1]
x2, y2 = trajectory[i]
cv2.line(frame, (x1, y1), (x2, y2), (255, 0, 0), 2)
if len(real_trajectory) >= 2:
x1, y1 = real_trajectory[-2]
x2, y2 = real_trajectory[-1]
speed = estimate_speed((x1, y1), (x2, y2), fps)
cv2.putText(frame, f"Speed: {int(speed)} km/h", (30, 30), cv2.FONT_HERSHEY_SIMPLEX, 1, (255,255,255), 2)
if bounce_detected and real_trajectory:
x_latest, y_latest = real_trajectory[-1]
if 1.0 < y_latest < 2.1:
verdict = "OUT"
cv2.putText(frame, f"Decision: {verdict}", (30, height - 30), cv2.FONT_HERSHEY_SIMPLEX, 1.2,
(0,255,0) if verdict == "NOT OUT" else (0,0,255), 3)
out.write(frame)
frame_count += 1
cap.release()
out.release()
topdown_image = draw_top_down_trajectory(real_trajectory, bounce_frame_idx, temp_out)
try:
final_video = add_voice_to_video(temp_out, verdict)
except:
final_video = temp_out
return final_video, topdown_image