Spaces:
Sleeping
Sleeping
| """ | |
| Digital Review System (DRS) application for LBW decisions | |
| ======================================================== | |
| """ | |
| from __future__ import annotations | |
| import os | |
| import shutil | |
| import tempfile | |
| from pathlib import Path | |
| from typing import Any, Dict, Tuple | |
| import gradio as gr | |
| from drs_modules.video_processing import trim_last_seconds, save_uploaded_video | |
| from drs_modules.detection import detect_and_track_ball | |
| from drs_modules.trajectory import estimate_trajectory, predict_stumps_intersection | |
| from drs_modules.lbw_decision import make_lbw_decision | |
| from drs_modules.visualization import ( | |
| generate_trajectory_plot, | |
| annotate_video_with_tracking, | |
| ) | |
| def analyse_appeal(video_path: str, review_seconds: int = 8) -> Tuple[str, Dict[str, Any]]: | |
| temp_dir = tempfile.mkdtemp() | |
| trimmed_path = os.path.join(temp_dir, "trimmed.mp4") | |
| trim_last_seconds(video_path, trimmed_path, review_seconds) | |
| tracking_data = detect_and_track_ball(trimmed_path) | |
| trajectory_model = estimate_trajectory(tracking_data["centers"], tracking_data["timestamps"]) | |
| will_hit_stumps = predict_stumps_intersection(trajectory_model) | |
| decision, impact_frame_idx = make_lbw_decision( | |
| tracking_data["centers"], trajectory_model, will_hit_stumps | |
| ) | |
| total_distance_px = 0.0 | |
| for i in range(1, len(tracking_data["centers"])): | |
| cx0, cy0 = tracking_data["centers"][i - 1] | |
| cx1, cy1 = tracking_data["centers"][i] | |
| total_distance_px += ((cx1 - cx0) ** 2 + (cy1 - cy0) ** 2) ** 0.5 | |
| duration = tracking_data["timestamps"][-1] - tracking_data["timestamps"][0] | |
| if duration <= 0: | |
| speed_kmh = 0.0 | |
| else: | |
| pixels_per_metre = 50.0 | |
| speed_mps = (total_distance_px / pixels_per_metre) / duration | |
| speed_kmh = speed_mps * 3.6 | |
| annotated_video_path = os.path.join(temp_dir, "annotated.mp4") | |
| annotate_video_with_tracking( | |
| trimmed_path, | |
| tracking_data["centers"], | |
| trajectory_model, | |
| will_hit_stumps, | |
| impact_frame_idx, | |
| annotated_video_path, | |
| ) | |
| plot_path = os.path.join(temp_dir, "trajectory_plot.png") | |
| generate_trajectory_plot( | |
| tracking_data["centers"], trajectory_model, will_hit_stumps, plot_path | |
| ) | |
| decision_message = f"Decision: {decision}" | |
| result = { | |
| "decision": decision, | |
| "ball_speed_kmh": round(speed_kmh, 2), | |
| "impact_frame_index": impact_frame_idx, | |
| "annotated_video": annotated_video_path, | |
| "trajectory_plot": plot_path, | |
| } | |
| return decision_message, result | |
| def build_interface() -> gr.Blocks: | |
| with gr.Blocks(title="Cricket LBW DRS Demo") as demo: | |
| gr.Markdown( | |
| """# Digital Review System (LBW) | |
| This demo lets you record or upload cricket match footage and analyse LBW appeals. | |
| You'll get a 3D trajectory plot, annotated replay, and OUT/NOT OUT decision. | |
| """ | |
| ) | |
| with gr.Tab("Live Match Recording"): | |
| video_input = gr.Video( | |
| label="Record or upload match video", | |
| sources=["upload", "webcam"] | |
| ) | |
| out_video_path = gr.State() | |
| def on_video_upload(video_file): | |
| if video_file is None: | |
| return "" | |
| file_path = video_file.name if hasattr(video_file, "name") else video_file | |
| return save_uploaded_video(file_path, video_file) | |
| video_output = gr.Textbox(visible=False) | |
| video_input.change( | |
| fn=on_video_upload, | |
| inputs=[video_input], | |
| outputs=[out_video_path], | |
| ) | |
| gr.Markdown( | |
| """ | |
| After recording or uploading a video, switch to the **LBW Review** tab and click **Analyse Appeal**. | |
| """ | |
| ) | |
| with gr.Tab("LBW Review"): | |
| with gr.Row(): | |
| analyse_button = gr.Button("Analyse Appeal") | |
| review_seconds = gr.Number( | |
| value=8, label="Seconds to review", minimum=2, maximum=20 | |
| ) | |
| decision_output = gr.Textbox(label="Decision", lines=1) | |
| ball_speed_output = gr.Textbox(label="Ball speed (km/h)", lines=1, interactive=False) | |
| impact_frame_output = gr.Textbox(label="Impact frame index", lines=1, interactive=False) | |
| annotated_video_output = gr.Video(label="Annotated replay video") | |
| trajectory_plot_output = gr.Image(label="3D Trajectory plot") | |
| def on_analyse(video_path): | |
| if not video_path or not os.path.exists(video_path): | |
| return ( | |
| "Please record or upload a video in the first tab.", | |
| None, | |
| None, | |
| None, | |
| None, | |
| ) | |
| message, result = analyse_appeal(video_path, int(review_seconds.value)) | |
| return ( | |
| message, | |
| str(result["ball_speed_kmh"]), | |
| str(result["impact_frame_index"]), | |
| result["annotated_video"], | |
| result["trajectory_plot"], | |
| ) | |
| analyse_button.click( | |
| fn=on_analyse, | |
| inputs=[video_output], | |
| outputs=[ | |
| decision_output, | |
| ball_speed_output, | |
| impact_frame_output, | |
| annotated_video_output, | |
| trajectory_plot_output, | |
| ], | |
| ) | |
| return demo | |
| if __name__ == "__main__": | |
| demo = build_interface() | |
| demo.launch() | |