import numpy as np import pandas as pd from typing import List, Dict from PIL import Image import cv2 import onnxruntime as ort from utils.helper import BASE_DIR from pathlib import Path def denormalize_angles(normalized_angles): """ Convierte ángulos normalizados [-1,1] a grados [-180,180] """ return (normalized_angles + 1) / 2 * (180 - (-180)) + (-180) def preprocess_image_exactly_like_pytorch(image_input): """ Preprocesa una imagen de OpenCV (como adjusted_edges) para usarla con modelos ONNX. Args: image_input: Array NumPy de OpenCV (imagen de bordes, binaria, etc.) Returns: Array NumPy listo para inferencia con ONNX """ # Verificar que la entrada no sea None if image_input is None: raise ValueError("Received None as image input") # Asegurar que la imagen es un array NumPy if not isinstance(image_input, np.ndarray): raise TypeError(f"Expected NumPy array, got {type(image_input)}") # Verificar que la imagen tiene dimensiones válidas if len(image_input.shape) < 2: raise ValueError(f"Invalid image shape: {image_input.shape}") # Copia para no modificar la original img_copy = image_input.copy() # Si es una imagen de bordes o binaria, normalmente tiene valores 0 y 255 # o 0 y 1. Asegurarse de que está en el rango [0, 255] if img_copy.dtype != np.uint8: if np.max(img_copy) <= 1.0: # Si está en rango [0, 1], convertir a [0, 255] img_copy = (img_copy * 255).astype(np.uint8) else: # De otro modo, simplemente convertir a uint8 img_copy = img_copy.astype(np.uint8) # Para imágenes de bordes o binarias, asegurar que tenemos valores claros # (si todos los valores son muy bajos, puede que no se vea nada) if np.mean(img_copy) < 10 and np.max(img_copy) > 0: # Estirar el contraste para mejor visualización img_copy = cv2.normalize(img_copy, None, 0, 255, cv2.NORM_MINMAX) # Asegurar que la imagen es de un solo canal (escala de grises) if len(img_copy.shape) == 3: if img_copy.shape[2] == 3: # Convertir imagen BGR a escala de grises img_copy = cv2.cvtColor(img_copy, cv2.COLOR_BGR2GRAY) else: # Tomar solo el primer canal img_copy = img_copy[:, :, 0] try: # Convertir de NumPy array a PIL Image img_pil = Image.fromarray(img_copy) # Redimensionar con PIL img_resized = img_pil.resize((224, 224), Image.BILINEAR) # Convertir a numpy array img_np = np.array(img_resized, dtype=np.float32) # Normalizar de [0,255] a [0,1] img_np = img_np / 255.0 # Normalizar con mean=0.5, std=0.5 (como en PyTorch) img_np = (img_np - 0.5) / 0.5 # Reformatear para ONNX [batch_size, channels, height, width] img_np = np.expand_dims(img_np, axis=0) # Añadir dimensión de canal img_np = np.expand_dims(img_np, axis=0) # Añadir dimensión de batch return img_np except Exception as e: print(f"Error processing image: {e}") print(f"Image shape: {image_input.shape}, dtype: {image_input.dtype}") print(f"Min value: {np.min(image_input)}, Max value: {np.max(image_input)}") raise def correct_outlier_angles(df, window_size=5, std_threshold=3.0, max_diff_threshold=80.0): angles = df['steering_angle'].values corrected_angles = angles.copy() for i in range(len(angles)): if i < window_size // 2 or i >= len(angles) - window_size // 2: # Evitar bordes continue # Definir ventana local start_idx = max(0, i - window_size // 2) end_idx = min(len(angles), i + window_size // 2 + 1) window = angles[start_idx:end_idx] # Calcular estadísticas locales incluyendo el valor actual curr_angle = angles[i] local_mean = np.mean(window) local_std = np.std(window) if len(window) > 1 else 0 # Calcular distancia angular mínima considerando el rango cíclico (-180° a 180°) def angular_distance(a, b): diff = abs(a - b) return min(diff, 360 - diff) if diff > 180 else diff diff_from_mean = angular_distance(curr_angle, local_mean) # Detectar outlier is_outlier = (diff_from_mean > std_threshold * local_std) or (diff_from_mean > max_diff_threshold) if is_outlier: print(i, curr_angle, local_mean, local_std, diff_from_mean) # Excluir el valor actual del cálculo del promedio de corrección corrected_window = np.delete(window, i - start_idx) if len(corrected_window) > 0: corrected_mean = np.mean(corrected_window) # Ajustar el ángulo corregido al rango cíclico más cercano diff_to_corrected = angular_distance(curr_angle, corrected_mean) if diff_to_corrected > 180: corrected_angles[i] = corrected_mean - 360 if corrected_mean > 0 else corrected_mean + 360 else: corrected_angles[i] = corrected_mean # Crear nuevo DataFrame corrected_df = df.copy() corrected_df['steering_angle'] = corrected_angles return corrected_df class ModelHandler: def __init__(self): # Placeholder for actual model loading self.current_model = None self.current_model_name = None self.fps = None self.available_models = { "F1 Steering Angle Detection": Path(BASE_DIR) / "models" / "f1-steering-angle-model.onnx", "Track Position Analysis": "position_model", "Driver Behavior Analysis": "behavior_model" } def _load_model_if_needed(self, model_name: str): """Load the model only if it's not already loaded or if it's different""" if self.current_model is None or self.current_model_name != model_name: print(f"Loading model: {model_name}") # Debugging info self.current_model = ort.InferenceSession(self.available_models[model_name]) self.current_model_name = model_name def process_frames(self, frames: List[np.ndarray], model_name: str) -> Dict: """Process frames through selected model with efficient batch processing""" if not frames: return [] # Load model only once self._load_model_if_needed(model_name) # Get input name once input_name = self.current_model.get_inputs()[0].name results = [] # Define optimal batch size - ajusta según tu hardware BATCH_SIZE = 16 index = 0 # Process frames in batches for batch_start in range(0, len(frames), BATCH_SIZE): # Get current batch batch_end = min(batch_start + BATCH_SIZE, len(frames)) current_batch = frames[batch_start:batch_end] batch_inputs = [] # Pre-process all frames in the current batch for frame in current_batch: try: # Procesar imagen pero mantener en formato que permita agrupación #cv2.imwrite(r"img_test/"+str(index)+".jpg", frame) index= index+1 processed_input = preprocess_image_exactly_like_pytorch(frame) batch_inputs.append(processed_input) except Exception as e: print(f"Error preprocessing frame: {e}") # Usar un tensor vacío del mismo tamaño como reemplazo empty_tensor = np.zeros((1, 1, 224, 224), dtype=np.float32) batch_inputs.append(empty_tensor) try: # Combinar todos los inputs pre-procesados en un solo lote grande # Cada input tiene forma [1, 1, 224, 224], los concatenamos en la dimensión 0 batched_input = np.vstack(batch_inputs) # Ejecutar inferencia sobre todo el lote a la vez ort_inputs = {input_name: batched_input} ort_outputs = self.current_model.run(None, ort_inputs) # Procesar resultados por lotes for i in range(len(current_batch)): frame_idx = batch_start + i +1 predicted_angle_normalized = ort_outputs[0][i][0] angle = denormalize_angles(predicted_angle_normalized) confidence = np.random.uniform(0.7, 0.99) results.append({ 'frame_number': frame_idx, 'steering_angle': angle, }) except Exception as e: print(f"Error in batch processing: {e}") # Si falla el procesamiento por lotes, volver a procesar individualmente for i, frame in enumerate(current_batch): frame_idx = batch_start + i +1 try: input_data = preprocess_image_exactly_like_pytorch(frame) ort_inputs = {input_name: input_data} ort_outputs = self.current_model.run(None, ort_inputs) predicted_angle_normalized = ort_outputs[0][0][0] angle = denormalize_angles(predicted_angle_normalized) confidence = np.random.uniform(0.7, 0.99) results.append({ 'frame_number': frame_idx, 'steering_angle': angle }) except Exception as sub_e: print(f"Error processing individual frame {frame_idx}: {sub_e}") # Añadir un resultado con valores predeterminados results.append({ 'frame_number': frame_idx, 'steering_angle': 0.0 }) return results def export_results(self, results: Dict) -> pd.DataFrame: """Convert results to pandas DataFrame for export""" df = pd.DataFrame(results) df['time'] = round(df['frame_number'] / self.fps,3) df = correct_outlier_angles(df, window_size=3, std_threshold=100, max_diff_threshold=15.0) df = correct_outlier_angles(df, window_size=3, std_threshold=100, max_diff_threshold=15.0) df = correct_outlier_angles(df, window_size=3, std_threshold=100, max_diff_threshold=15.0) df = correct_outlier_angles(df, window_size=3, std_threshold=100, max_diff_threshold=15.0) return df