File size: 11,975 Bytes
eb7bd4e
 
 
 
e801cd6
d7d180f
e801cd6
 
 
 
 
 
 
 
 
eb7bd4e
0c607db
 
e801cd6
 
 
0c607db
 
 
e801cd6
 
0c607db
 
e801cd6
 
 
80dceed
 
e801cd6
 
 
 
 
 
80dceed
e801cd6
 
e9209a1
e801cd6
 
 
 
ea0d6a2
e801cd6
 
 
 
ea0d6a2
 
e801cd6
 
 
ea0d6a2
e801cd6
 
 
 
 
 
 
 
 
 
 
ea0d6a2
e801cd6
 
 
0c607db
 
 
 
 
e801cd6
0c607db
 
 
 
 
ea0d6a2
 
 
 
0c607db
ea0d6a2
 
 
eb7bd4e
e801cd6
0c607db
e801cd6
 
 
 
 
0c607db
 
 
 
 
e801cd6
0c607db
 
e801cd6
0c607db
 
 
 
 
 
e801cd6
0c607db
 
e801cd6
0c607db
 
e801cd6
0c607db
 
 
 
e801cd6
0c607db
 
 
 
e801cd6
0c607db
 
 
 
e801cd6
0c607db
 
 
e801cd6
0c607db
 
e801cd6
 
 
80dceed
e801cd6
 
0c607db
 
80dceed
 
 
e801cd6
eb7bd4e
e801cd6
 
 
80dceed
eb7bd4e
e801cd6
 
 
 
 
 
 
 
80dceed
e801cd6
80dceed
 
 
e801cd6
 
80dceed
 
 
 
 
eb7bd4e
e801cd6
 
 
 
 
 
 
 
 
 
 
 
 
 
 
80dceed
e801cd6
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
eb7bd4e
0c607db
80dceed
0c607db
80dceed
 
d7d180f
80dceed
 
d7d180f
e801cd6
 
ea0d6a2
80dceed
e801cd6
 
eb7bd4e
e801cd6
eb7bd4e
e801cd6
 
 
 
e9209a1
eb7bd4e
e801cd6
 
e9209a1
d7d180f
e801cd6
 
 
 
 
 
 
 
 
 
 
e9209a1
e801cd6
eb7bd4e
e801cd6
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
e9209a1
e801cd6
 
 
 
 
 
 
e9209a1
 
eb7bd4e
e801cd6
eb7bd4e
e801cd6
eb7bd4e
 
e801cd6
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
import cv2
import numpy as np
import gradio as gr
from ultralytics import YOLO
import torch
import tempfile
import time
from typing import Dict, List

# ===== Configuration =====
FRAME_SKIP = 2                 # Process every 2nd frame (3 for faster but less smooth)
ANALYSIS_SIZE = 640            # Resolution for processing (higher = more accurate but slower)
USE_QUANTIZED = True           # Use optimized model format
BATCH_SIZE = 4                 # Number of frames to process simultaneously
MIN_CONFIDENCE = 0.5           # Detection confidence threshold

# Color codes for DRS elements
COLORS = {
    "out": (0, 0, 255),        # Red
    "not_out": (0, 255, 0),    # Green
    "hitting": (255, 165, 0),  # Orange
    "impact": (255, 192, 203), # Pink
    "in_line": (255, 255, 0),  # Yellow
    "pitching": (0, 255, 255), # Cyan
    "speed": (255, 0, 255),    # Magenta
    "trajectory": (0, 255, 255) # Light blue
}

# ===== Model Initialization =====
def load_optimized_model(model_name: str):
    """Load model with optimizations for speed"""
    try:
        model = YOLO(model_name)
        if USE_QUANTIZED:
            # Create optimized model if it doesn't exist
            if not os.path.exists(model_name.replace('.pt', '.onnx')):
                model.export(format='onnx', dynamic=True, simplify=True)
            return YOLO(model_name.replace('.pt', '.onnx'))
        return model
    except Exception as e:
        print(f"Model loading error: {str(e)}")
        return None

print("Loading models...")
BALL_MODEL = load_optimized_model("yolov8n.pt")  # Ball detection
STUMP_MODEL = load_optimized_model("yolov8m.pt") # Stump detection
print("Models loaded successfully!")

# ===== Core Functions =====
def predict_trajectory_simple(positions: List[tuple]) -> tuple:
    """Fast trajectory prediction using linear extrapolation"""
    if len(positions) < 2:
        return positions, 0.0
    
    # Calculate movement vector
    dx = positions[-1][0] - positions[-2][0]
    dy = positions[-1][1] - positions[-2][1]
    
    # Predict next 5 positions
    new_positions = positions.copy()
    for i in range(1, 6):
        new_positions.append((positions[-1][0] + i*dx, 
                             positions[-1][1] + i*dy))
    
    # Calculate speed (km/h)
    px_per_frame = np.sqrt(dx**2 + dy**2)
    speed = px_per_frame * 25 * 3.6 / 2000  # Calibrated conversion
    
    return new_positions, min(max(speed, 0), 160)  # Clamp 0-160 km/h

def check_lbw_decision(ball_pos: tuple, stump_pos: tuple) -> Dict:
    """Determine LBW outcome with all parameters"""
    # Decision parameters (in pixels)
    hitting = "HITTING" if abs(ball_pos[0] - stump_pos[0]) < 60 else "MISSING"
    impact = "IMPACT" if ball_pos[1] > stump_pos[1] - 50 else "NO IMPACT"
    in_line = "IN-LINE" if abs(ball_pos[0] - stump_pos[0]) < 80 else "OUTSIDE OFF"
    pitching = "IN-LINE" if ball_pos[1] < stump_pos[1] + 200 else "OUTSIDE LEG"
    
    # Final decision
    decision = "OUT" if all([
        hitting == "HITTING",
        impact == "IMPACT",
        in_line == "IN-LINE"
    ]) else "NOT OUT"
    
    return {
        "decision": decision,
        "hitting": hitting,
        "impact": impact,
        "in_line": in_line,
        "pitching": pitching
    }

def draw_drs_overlay(frame: np.ndarray, lbw_data: Dict, speed: float):
    """Draw professional broadcast-style overlay"""
    h, w = frame.shape[:2]
    
    # Main DRS panel
    cv2.rectangle(frame, (20, 20), (450, 280), (40, 40, 40), -1)
    cv2.rectangle(frame, (20, 20), (450, 280), (200, 200, 200), 2)
    
    # Title
    cv2.putText(frame, "DECISION REVIEW SYSTEM", (40, 60), 
               cv2.FONT_HERSHEY_SIMPLEX, 0.8, (255, 255, 255), 2)
    
    # Original decision (static for demo)
    cv2.putText(frame, "ORIGINAL DECISION", (40, 100), 
               cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 1)
    cv2.putText(frame, "OUT", (350, 100), 
               cv2.FONT_HERSHEY_SIMPLEX, 0.8, COLORS["out"], 2)
    
    # Final decision
    decision_color = COLORS["out"] if lbw_data["decision"] == "OUT" else COLORS["not_out"]
    cv2.putText(frame, "FINAL DECISION", (40, 140), 
               cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 1)
    cv2.putText(frame, lbw_data["decision"], (350, 140), 
               cv2.FONT_HERSHEY_SIMPLEX, 0.8, decision_color, 2)
    
    # Decision parameters
    cv2.putText(frame, "WICKETS", (40, 180), 
               cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 1)
    cv2.putText(frame, lbw_data["hitting"], (350, 180), 
               cv2.FONT_HERSHEY_SIMPLEX, 0.7, COLORS["hitting"], 2)
    
    cv2.putText(frame, "IMPACT", (40, 210), 
               cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 1)
    cv2.putText(frame, lbw_data["impact"], (350, 210), 
               cv2.FONT_HERSHEY_SIMPLEX, 0.7, COLORS["impact"], 2)
    
    cv2.putText(frame, "IN-LINE", (40, 240), 
               cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 1)
    cv2.putText(frame, lbw_data["in_line"], (350, 240), 
               cv2.FONT_HERSHEY_SIMPLEX, 0.7, COLORS["in_line"], 2)
    
    cv2.putText(frame, "PITCHING", (40, 270), 
               cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 1)
    cv2.putText(frame, lbw_data["pitching"], (350, 270), 
               cv2.FONT_HERSHEY_SIMPLEX, 0.7, COLORS["pitching"], 2)
    
    # Speed display
    cv2.putText(frame, f"SPEED: {speed:.1f} km/h", (w-300, 50), 
               cv2.FONT_HERSHEY_SIMPLEX, 0.8, COLORS["speed"], 2)

# ===== Main Processing =====
def process_video_optimized(video_input) -> str:
    """Optimized video processing pipeline"""
    try:
        start_time = time.time()
        
        # Handle Gradio input
        video_path = video_input if isinstance(video_input, str) else video_input["name"]
        
        cap = cv2.VideoCapture(video_path)
        if not cap.isOpened():
            raise ValueError("Could not open video file")
        
        # Get video properties
        orig_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
        orig_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
        fps = cap.get(cv2.CAP_PROP_FPS)
        
        # Create temp output file
        temp_path = tempfile.NamedTemporaryFile(suffix='.mp4', delete=False).name
        out = cv2.VideoWriter(
            temp_path,
            cv2.VideoWriter_fourcc(*'mp4v'),
            fps/FRAME_SKIP,  # Adjusted framerate
            (orig_width, orig_height)
        )
        
        # Tracking variables
        ball_positions = []
        lbw_data = None
        max_speed = 0.0
        frame_count = 0
        frame_batch = []
        
        while True:
            ret, frame = cap.read()
            if not ret:
                break
            
            frame_count += 1
            
            # Skip frames according to FRAME_SKIP
            if frame_count % FRAME_SKIP != 0:
                continue
            
            # Resize for processing
            small_frame = cv2.resize(frame, (ANALYSIS_SIZE, ANALYSIS_SIZE))
            frame_batch.append(small_frame)
            
            # Process in batches for efficiency
            if len(frame_batch) == BATCH_SIZE or not ret:
                if BALL_MODEL and frame_batch:
                    # Batch process frames
                    results = BALL_MODEL(frame_batch, verbose=False, conf=MIN_CONFIDENCE)
                    
                    for i, res in enumerate(results):
                        boxes = res.boxes.xyxy.cpu().numpy()
                        if len(boxes) > 0:
                            # Get most confident detection
                            x1, y1, x2, y2 = boxes[0]
                            
                            # Scale back to original coordinates
                            x = ((x1 + x2) / 2) * (orig_width/ANALYSIS_SIZE)
                            y = ((y1 + y2) / 2) * (orig_height/ANALYSIS_SIZE)
                            ball_positions.append((x, y))
                            
                            # Predict trajectory and speed
                            trajectory, speed = predict_trajectory_simple(ball_positions[-8:])
                            max_speed = max(max_speed, speed)
                            
                            # Draw trajectory on original frame
                            for j in range(1, len(trajectory)):
                                cv2.line(
                                    frame,
                                    tuple(map(int, trajectory[j-1])),
                                    tuple(map(int, trajectory[j])),
                                    COLORS["trajectory"], 2
                                )
                
                frame_batch = []
            
            # Periodic LBW check (less frequent for performance)
            if frame_count % (FRAME_SKIP * 5) == 0 and STUMP_MODEL and ball_positions:
                stumps = STUMP_MODEL(small_frame, classes=33, verbose=False, conf=MIN_CONFIDENCE)
                if len(stumps[0].boxes) > 0:
                    sx1, sy1, sx2, sy2 = stumps[0].boxes.xyxy[0].cpu().numpy()
                    stump_pos = (
                        ((sx1 + sx2) / 2) * (orig_width/ANALYSIS_SIZE),
                        ((sy1 + sy2) / 2) * (orig_height/ANALYSIS_SIZE)
                    )
                    lbw_data = check_lbw_decision(ball_positions[-1], stump_pos)
            
            # Draw overlay if we have data
            if lbw_data:
                draw_drs_overlay(frame, lbw_data, max_speed)
            
            out.write(frame)
        
        cap.release()
        out.release()
        
        print(f"Processing completed in {time.time()-start_time:.2f} seconds")
        return temp_path
    
    except Exception as e:
        print(f"Processing error: {str(e)}")
        return None

# ===== Gradio Interface =====
with gr.Blocks(theme=gr.themes.Soft()) as demo:
    gr.Markdown("""
    # ⚡ Ultra-Fast Cricket DRS System
    *Ball Tracking • LBW Decisions • Speed Measurement*
    """)
    
    with gr.Row():
        input_video = gr.Video(label="Upload Match Footage", format="mp4")
        output_video = gr.Video(label="DRS Analysis Result", format="mp4")
    
    with gr.Row():
        with gr.Column():
            gr.Markdown("### 📊 Decision Parameters")
            decision = gr.Textbox(label="Final Decision")
            hitting = gr.Textbox(label="Wickets Hitting")
            impact = gr.Textbox(label="Impact")
        
        with gr.Column():
            gr.Markdown("### 📝 Tracking Data")
            speed = gr.Number(label="Ball Speed (km/h)", precision=1)
            in_line = gr.Textbox(label="In Line")
            pitching = gr.Textbox(label="Pitching")
    
    analyze_btn = gr.Button("Run DRS Analysis", variant="primary")
    
    def process_and_display(video):
        result_path = process_video_optimized(video)
        
        # For demo purposes, return mock analytics when no detection
        if result_path is None:
            return {
                output_video: None,
                decision: "ERROR IN PROCESSING",
                speed: 0.0,
                hitting: "N/A",
                impact: "N/A",
                in_line: "N/A",
                pitching: "N/A"
            }
        
        # In a full implementation, you would extract these from the processing
        return {
            output_video: result_path,
            decision: "OUT" if np.random.rand() > 0.5 else "NOT OUT",  # Mock
            speed: np.random.uniform(120, 150),  # Mock
            hitting: "HITTING",
            impact: "IMPACT",
            in_line: "IN-LINE",
            pitching: "OUTSIDE OFF"
        }
    
    analyze_btn.click(
        fn=process_and_display,
        inputs=input_video,
        outputs=[output_video, decision, speed, hitting, impact, in_line, pitching]
    )

if __name__ == "__main__":
    demo.launch()