F1-steering-angle-model / utils /model_handler.py
daniel-saed's picture
Update utils/model_handler.py
1d44e42 verified
import numpy as np
import pandas as pd
from typing import List, Dict
from PIL import Image
import cv2
import onnxruntime as ort
from utils.helper import BASE_DIR
from pathlib import Path
def denormalize_angles(normalized_angles):
"""
Convierte ángulos normalizados [-1,1] a grados [-180,180]
"""
return (normalized_angles + 1) / 2 * (180 - (-180)) + (-180)
def preprocess_image_exactly_like_pytorch(image_input):
"""
Preprocesa una imagen de OpenCV (como adjusted_edges)
para usarla con modelos ONNX.
Args:
image_input: Array NumPy de OpenCV (imagen de bordes, binaria, etc.)
Returns:
Array NumPy listo para inferencia con ONNX
"""
# Verificar que la entrada no sea None
if image_input is None:
raise ValueError("Received None as image input")
# Asegurar que la imagen es un array NumPy
if not isinstance(image_input, np.ndarray):
raise TypeError(f"Expected NumPy array, got {type(image_input)}")
# Verificar que la imagen tiene dimensiones válidas
if len(image_input.shape) < 2:
raise ValueError(f"Invalid image shape: {image_input.shape}")
# Copia para no modificar la original
img_copy = image_input.copy()
# Si es una imagen de bordes o binaria, normalmente tiene valores 0 y 255
# o 0 y 1. Asegurarse de que está en el rango [0, 255]
if img_copy.dtype != np.uint8:
if np.max(img_copy) <= 1.0:
# Si está en rango [0, 1], convertir a [0, 255]
img_copy = (img_copy * 255).astype(np.uint8)
else:
# De otro modo, simplemente convertir a uint8
img_copy = img_copy.astype(np.uint8)
# Para imágenes de bordes o binarias, asegurar que tenemos valores claros
# (si todos los valores son muy bajos, puede que no se vea nada)
if np.mean(img_copy) < 10 and np.max(img_copy) > 0:
# Estirar el contraste para mejor visualización
img_copy = cv2.normalize(img_copy, None, 0, 255, cv2.NORM_MINMAX)
# Asegurar que la imagen es de un solo canal (escala de grises)
if len(img_copy.shape) == 3:
if img_copy.shape[2] == 3:
# Convertir imagen BGR a escala de grises
img_copy = cv2.cvtColor(img_copy, cv2.COLOR_BGR2GRAY)
else:
# Tomar solo el primer canal
img_copy = img_copy[:, :, 0]
try:
# Convertir de NumPy array a PIL Image
img_pil = Image.fromarray(img_copy)
# Redimensionar con PIL
img_resized = img_pil.resize((224, 224), Image.BILINEAR)
# Convertir a numpy array
img_np = np.array(img_resized, dtype=np.float32)
# Normalizar de [0,255] a [0,1]
img_np = img_np / 255.0
# Normalizar con mean=0.5, std=0.5 (como en PyTorch)
img_np = (img_np - 0.5) / 0.5
# Reformatear para ONNX [batch_size, channels, height, width]
img_np = np.expand_dims(img_np, axis=0) # Añadir dimensión de canal
img_np = np.expand_dims(img_np, axis=0) # Añadir dimensión de batch
return img_np
except Exception as e:
print(f"Error processing image: {e}")
print(f"Image shape: {image_input.shape}, dtype: {image_input.dtype}")
print(f"Min value: {np.min(image_input)}, Max value: {np.max(image_input)}")
raise
def correct_outlier_angles(df, window_size=5, std_threshold=3.0, max_diff_threshold=80.0):
angles = df['steering_angle'].values
corrected_angles = angles.copy()
for i in range(len(angles)):
if i < window_size // 2 or i >= len(angles) - window_size // 2: # Evitar bordes
continue
# Definir ventana local
start_idx = max(0, i - window_size // 2)
end_idx = min(len(angles), i + window_size // 2 + 1)
window = angles[start_idx:end_idx]
# Calcular estadísticas locales incluyendo el valor actual
curr_angle = angles[i]
local_mean = np.mean(window)
local_std = np.std(window) if len(window) > 1 else 0
# Calcular distancia angular mínima considerando el rango cíclico (-180° a 180°)
def angular_distance(a, b):
diff = abs(a - b)
return min(diff, 360 - diff) if diff > 180 else diff
diff_from_mean = angular_distance(curr_angle, local_mean)
# Detectar outlier
is_outlier = (diff_from_mean > std_threshold * local_std) or (diff_from_mean > max_diff_threshold)
if is_outlier:
print(i, curr_angle, local_mean, local_std, diff_from_mean)
# Excluir el valor actual del cálculo del promedio de corrección
corrected_window = np.delete(window, i - start_idx)
if len(corrected_window) > 0:
corrected_mean = np.mean(corrected_window)
# Ajustar el ángulo corregido al rango cíclico más cercano
diff_to_corrected = angular_distance(curr_angle, corrected_mean)
if diff_to_corrected > 180:
corrected_angles[i] = corrected_mean - 360 if corrected_mean > 0 else corrected_mean + 360
else:
corrected_angles[i] = corrected_mean
# Crear nuevo DataFrame
corrected_df = df.copy()
corrected_df['steering_angle'] = corrected_angles
return corrected_df
class ModelHandler:
def __init__(self):
# Placeholder for actual model loading
self.current_model = None
self.current_model_name = None
self.fps = None
self.available_models = {
"F1 Steering Angle Detection": Path(BASE_DIR) / "models" / "f1-steering-angle-model.onnx",
"Track Position Analysis": "position_model",
"Driver Behavior Analysis": "behavior_model"
}
def _load_model_if_needed(self, model_name: str):
"""Load the model only if it's not already loaded or if it's different"""
if self.current_model is None or self.current_model_name != model_name:
print(f"Loading model: {model_name}") # Debugging info
self.current_model = ort.InferenceSession(self.available_models[model_name])
self.current_model_name = model_name
def process_frames(self, frames: List[np.ndarray], model_name: str) -> Dict:
"""Process frames through selected model with efficient batch processing"""
if not frames:
return []
# Load model only once
self._load_model_if_needed(model_name)
# Get input name once
input_name = self.current_model.get_inputs()[0].name
results = []
# Define optimal batch size - ajusta según tu hardware
BATCH_SIZE = 16
index = 0
# Process frames in batches
for batch_start in range(0, len(frames), BATCH_SIZE):
# Get current batch
batch_end = min(batch_start + BATCH_SIZE, len(frames))
current_batch = frames[batch_start:batch_end]
batch_inputs = []
# Pre-process all frames in the current batch
for frame in current_batch:
try:
# Procesar imagen pero mantener en formato que permita agrupación
#cv2.imwrite(r"img_test/"+str(index)+".jpg", frame)
index= index+1
processed_input = preprocess_image_exactly_like_pytorch(frame)
batch_inputs.append(processed_input)
except Exception as e:
print(f"Error preprocessing frame: {e}")
# Usar un tensor vacío del mismo tamaño como reemplazo
empty_tensor = np.zeros((1, 1, 224, 224), dtype=np.float32)
batch_inputs.append(empty_tensor)
try:
# Combinar todos los inputs pre-procesados en un solo lote grande
# Cada input tiene forma [1, 1, 224, 224], los concatenamos en la dimensión 0
batched_input = np.vstack(batch_inputs)
# Ejecutar inferencia sobre todo el lote a la vez
ort_inputs = {input_name: batched_input}
ort_outputs = self.current_model.run(None, ort_inputs)
# Procesar resultados por lotes
for i in range(len(current_batch)):
frame_idx = batch_start + i +1
predicted_angle_normalized = ort_outputs[0][i][0]
angle = denormalize_angles(predicted_angle_normalized)
confidence = np.random.uniform(0.7, 0.99)
results.append({
'frame_number': frame_idx,
'steering_angle': angle,
})
except Exception as e:
print(f"Error in batch processing: {e}")
# Si falla el procesamiento por lotes, volver a procesar individualmente
for i, frame in enumerate(current_batch):
frame_idx = batch_start + i +1
try:
input_data = preprocess_image_exactly_like_pytorch(frame)
ort_inputs = {input_name: input_data}
ort_outputs = self.current_model.run(None, ort_inputs)
predicted_angle_normalized = ort_outputs[0][0][0]
angle = denormalize_angles(predicted_angle_normalized)
confidence = np.random.uniform(0.7, 0.99)
results.append({
'frame_number': frame_idx,
'steering_angle': angle
})
except Exception as sub_e:
print(f"Error processing individual frame {frame_idx}: {sub_e}")
# Añadir un resultado con valores predeterminados
results.append({
'frame_number': frame_idx,
'steering_angle': 0.0
})
return results
def export_results(self, results: Dict) -> pd.DataFrame:
"""Convert results to pandas DataFrame for export"""
df = pd.DataFrame(results)
df['time'] = round(df['frame_number'] / self.fps,3)
df = correct_outlier_angles(df, window_size=3, std_threshold=100, max_diff_threshold=15.0)
df = correct_outlier_angles(df, window_size=3, std_threshold=100, max_diff_threshold=15.0)
df = correct_outlier_angles(df, window_size=3, std_threshold=100, max_diff_threshold=15.0)
df = correct_outlier_angles(df, window_size=3, std_threshold=100, max_diff_threshold=15.0)
return df