File size: 11,128 Bytes
b60c4ae
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1d44e42
b60c4ae
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
import numpy as np
import pandas as pd
from typing import List, Dict
from PIL import Image
import cv2
import onnxruntime as ort
from utils.helper import BASE_DIR
from pathlib import Path

def denormalize_angles(normalized_angles):
    """
    Convierte ángulos normalizados [-1,1] a grados [-180,180]
    """
    return (normalized_angles + 1) / 2 * (180 - (-180)) + (-180)

def preprocess_image_exactly_like_pytorch(image_input):
    """
    Preprocesa una imagen de OpenCV (como adjusted_edges) 
    para usarla con modelos ONNX.
    
    Args:
        image_input: Array NumPy de OpenCV (imagen de bordes, binaria, etc.)
        
    Returns:
        Array NumPy listo para inferencia con ONNX
    """
    # Verificar que la entrada no sea None
    if image_input is None:
        raise ValueError("Received None as image input")
        
    # Asegurar que la imagen es un array NumPy
    if not isinstance(image_input, np.ndarray):
        raise TypeError(f"Expected NumPy array, got {type(image_input)}")
        
    # Verificar que la imagen tiene dimensiones válidas
    if len(image_input.shape) < 2:
        raise ValueError(f"Invalid image shape: {image_input.shape}")
    
    # Copia para no modificar la original    
    img_copy = image_input.copy()
    
    # Si es una imagen de bordes o binaria, normalmente tiene valores 0 y 255
    # o 0 y 1. Asegurarse de que está en el rango [0, 255]
    if img_copy.dtype != np.uint8:
        if np.max(img_copy) <= 1.0:
            # Si está en rango [0, 1], convertir a [0, 255]
            img_copy = (img_copy * 255).astype(np.uint8)
        else:
            # De otro modo, simplemente convertir a uint8
            img_copy = img_copy.astype(np.uint8)
    
    # Para imágenes de bordes o binarias, asegurar que tenemos valores claros
    # (si todos los valores son muy bajos, puede que no se vea nada)
    if np.mean(img_copy) < 10 and np.max(img_copy) > 0:
        # Estirar el contraste para mejor visualización
        img_copy = cv2.normalize(img_copy, None, 0, 255, cv2.NORM_MINMAX)
    
    # Asegurar que la imagen es de un solo canal (escala de grises)
    if len(img_copy.shape) == 3:
        if img_copy.shape[2] == 3:
            # Convertir imagen BGR a escala de grises
            img_copy = cv2.cvtColor(img_copy, cv2.COLOR_BGR2GRAY)
        else:
            # Tomar solo el primer canal
            img_copy = img_copy[:, :, 0]
    
    try:
        # Convertir de NumPy array a PIL Image
        img_pil = Image.fromarray(img_copy)
        
        # Redimensionar con PIL 
        img_resized = img_pil.resize((224, 224), Image.BILINEAR)
        
        # Convertir a numpy array
        img_np = np.array(img_resized, dtype=np.float32)
        
        # Normalizar de [0,255] a [0,1]
        img_np = img_np / 255.0
        
        # Normalizar con mean=0.5, std=0.5 (como en PyTorch)
        img_np = (img_np - 0.5) / 0.5
        
        # Reformatear para ONNX [batch_size, channels, height, width]
        img_np = np.expand_dims(img_np, axis=0)  # Añadir dimensión de canal
        img_np = np.expand_dims(img_np, axis=0)  # Añadir dimensión de batch
        
        return img_np
    except Exception as e:
        print(f"Error processing image: {e}")
        print(f"Image shape: {image_input.shape}, dtype: {image_input.dtype}")
        print(f"Min value: {np.min(image_input)}, Max value: {np.max(image_input)}")
        raise

def correct_outlier_angles(df, window_size=5, std_threshold=3.0, max_diff_threshold=80.0):
    
    angles = df['steering_angle'].values
    corrected_angles = angles.copy()
    
    for i in range(len(angles)):
        if i < window_size // 2 or i >= len(angles) - window_size // 2:  # Evitar bordes
            continue
        
        # Definir ventana local
        start_idx = max(0, i - window_size // 2)
        end_idx = min(len(angles), i + window_size // 2 + 1)
        window = angles[start_idx:end_idx]
        
        # Calcular estadísticas locales incluyendo el valor actual
        curr_angle = angles[i]
        local_mean = np.mean(window)
        local_std = np.std(window) if len(window) > 1 else 0
        
        # Calcular distancia angular mínima considerando el rango cíclico (-180° a 180°)
        def angular_distance(a, b):
            diff = abs(a - b)
            return min(diff, 360 - diff) if diff > 180 else diff
        
        diff_from_mean = angular_distance(curr_angle, local_mean)
        
        # Detectar outlier
        is_outlier = (diff_from_mean > std_threshold * local_std) or (diff_from_mean > max_diff_threshold)
        
        if is_outlier:
            print(i, curr_angle, local_mean, local_std, diff_from_mean)
            # Excluir el valor actual del cálculo del promedio de corrección
            corrected_window = np.delete(window, i - start_idx)
            if len(corrected_window) > 0:
                corrected_mean = np.mean(corrected_window)
                # Ajustar el ángulo corregido al rango cíclico más cercano
                diff_to_corrected = angular_distance(curr_angle, corrected_mean)
                if diff_to_corrected > 180:
                    corrected_angles[i] = corrected_mean - 360 if corrected_mean > 0 else corrected_mean + 360
                else:
                    corrected_angles[i] = corrected_mean
    
    # Crear nuevo DataFrame
    corrected_df = df.copy()
    corrected_df['steering_angle'] = corrected_angles
    return corrected_df

class ModelHandler:
    def __init__(self):
        # Placeholder for actual model loading
        self.current_model = None
        self.current_model_name = None
        self.fps = None
        self.available_models = {
            "F1 Steering Angle Detection": Path(BASE_DIR) / "models" / "f1-steering-angle-model.onnx",
            "Track Position Analysis": "position_model",
            "Driver Behavior Analysis": "behavior_model"
        }
        
    def _load_model_if_needed(self, model_name: str):
        """Load the model only if it's not already loaded or if it's different"""
        if self.current_model is None or self.current_model_name != model_name:
            print(f"Loading model: {model_name}")  # Debugging info
            self.current_model = ort.InferenceSession(self.available_models[model_name])
            self.current_model_name = model_name
        
    def process_frames(self, frames: List[np.ndarray], model_name: str) -> Dict:
        """Process frames through selected model with efficient batch processing"""
        if not frames:
            return []
        
        # Load model only once
        self._load_model_if_needed(model_name)
        
        # Get input name once
        input_name = self.current_model.get_inputs()[0].name
        
        results = []
        
        # Define optimal batch size - ajusta según tu hardware
        BATCH_SIZE = 16
        index = 0
        # Process frames in batches
        for batch_start in range(0, len(frames), BATCH_SIZE):
            # Get current batch
            batch_end = min(batch_start + BATCH_SIZE, len(frames))
            current_batch = frames[batch_start:batch_end]
            batch_inputs = []
            
            # Pre-process all frames in the current batch
            for frame in current_batch:
                try:
                    # Procesar imagen pero mantener en formato que permita agrupación
                    
                    #cv2.imwrite(r"img_test/"+str(index)+".jpg", frame)
                    index= index+1
                    processed_input = preprocess_image_exactly_like_pytorch(frame)
                    batch_inputs.append(processed_input)
                except Exception as e:
                    print(f"Error preprocessing frame: {e}")
                    # Usar un tensor vacío del mismo tamaño como reemplazo
                    empty_tensor = np.zeros((1, 1, 224, 224), dtype=np.float32)
                    batch_inputs.append(empty_tensor)
            
            try:
                # Combinar todos los inputs pre-procesados en un solo lote grande
                # Cada input tiene forma [1, 1, 224, 224], los concatenamos en la dimensión 0
                batched_input = np.vstack(batch_inputs)
                
                # Ejecutar inferencia sobre todo el lote a la vez
                ort_inputs = {input_name: batched_input}
                ort_outputs = self.current_model.run(None, ort_inputs)
                
                # Procesar resultados por lotes
                for i in range(len(current_batch)):
                    frame_idx = batch_start + i +1
                    predicted_angle_normalized = ort_outputs[0][i][0]
                    angle = denormalize_angles(predicted_angle_normalized)
                    confidence = np.random.uniform(0.7, 0.99)
                    
                    results.append({
                        'frame_number': frame_idx,
                        'steering_angle': angle,
                    })
                    
            except Exception as e:
                print(f"Error in batch processing: {e}")
                # Si falla el procesamiento por lotes, volver a procesar individualmente
                for i, frame in enumerate(current_batch):
                    frame_idx = batch_start + i +1
                    try:
                        input_data = preprocess_image_exactly_like_pytorch(frame)
                        ort_inputs = {input_name: input_data}
                        ort_outputs = self.current_model.run(None, ort_inputs)
                        
                        predicted_angle_normalized = ort_outputs[0][0][0]
                        angle = denormalize_angles(predicted_angle_normalized)
                        confidence = np.random.uniform(0.7, 0.99)
                        
                        results.append({
                            'frame_number': frame_idx,
                            'steering_angle': angle
                        })
                    except Exception as sub_e:
                        print(f"Error processing individual frame {frame_idx}: {sub_e}")
                        # Añadir un resultado con valores predeterminados
                        results.append({
                            'frame_number': frame_idx,
                            'steering_angle': 0.0
                        })
        
        return results
        
    def export_results(self, results: Dict) -> pd.DataFrame:
        """Convert results to pandas DataFrame for export"""
        df = pd.DataFrame(results)
        df['time'] = round(df['frame_number'] / self.fps,3)
        
        df = correct_outlier_angles(df, window_size=3, std_threshold=100, max_diff_threshold=15.0)
        df = correct_outlier_angles(df, window_size=3, std_threshold=100, max_diff_threshold=15.0)
        df = correct_outlier_angles(df, window_size=3, std_threshold=100, max_diff_threshold=15.0)
        df = correct_outlier_angles(df, window_size=3, std_threshold=100, max_diff_threshold=15.0)
            
        
        return df