F1-steering-angle-model / utils /video_processor.py
daniel-saed's picture
Update utils/video_processor.py
724c049 verified
import cv2
import numpy as np
from typing import List, Tuple
import tempfile
import time
import functools
from collections import defaultdict
import onnxruntime as ort
from utils.model_handler import ModelHandler
from utils.helper import (
preprocess_image_tensor,
postprocess_outputs,
recortar_imagen,
recortar_imagen_again,
calculate_black_pixels_percentage,
adaptive_edge_detection,
)
from collections import OrderedDict
from concurrent.futures import ThreadPoolExecutor
from pathlib import Path
from utils.helper import BASE_DIR
import os
class Profiler:
"""Clase para trackear el tiempo de ejecución de las funciones"""
_instance = None
def __new__(cls):
if cls._instance is None:
cls._instance = super(Profiler, cls).__new__(cls)
cls._instance.function_times = defaultdict(list)
cls._instance.call_counts = defaultdict(int)
return cls._instance
def track_time(self, func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
start_time = time.time()
result = func(*args, **kwargs)
end_time = time.time()
elapsed = end_time - start_time
self.function_times[func.__name__].append(elapsed)
self.call_counts[func.__name__] += 1
return result
return wrapper
def print_stats(self):
print("\n===== FUNCIÓN TIMING STATS =====")
print(f"{'FUNCIÓN':<30} {'LLAMADAS':<10} {'TOTAL (s)':<15} {'PROMEDIO (s)':<15} {'% TIEMPO':<10}")
total_time = sum(sum(times) for times in self.function_times.values())
# Ordenar por tiempo total (descendente)
sorted_funcs = sorted(
self.function_times.items(),
key=lambda x: sum(x[1]),
reverse=True
)
for func_name, times in sorted_funcs:
total = sum(times)
avg = total / len(times) if times else 0
calls = self.call_counts[func_name]
percent = (total / total_time * 100) if total_time > 0 else 0
print(f"{func_name:<30} {calls:<10} {total:<15.4f} {avg:<15.4f} {percent:<10.2f}%")
print(f"\nTiempo total de procesamiento: {total_time:.4f} segundos")
print("================================")
def get_stats_dict(self):
"""Devuelve las estadísticas como un diccionario para mostrar en Streamlit"""
stats = []
total_time = sum(sum(times) for times in self.function_times.values())
for func_name, times in self.function_times.items():
total = sum(times)
avg = total / len(times) if times else 0
calls = self.call_counts[func_name]
percent = (total / total_time * 100) if total_time > 0 else 0
stats.append({
'función': func_name,
'llamadas': calls,
'tiempo_total': total,
'tiempo_promedio': avg,
'porcentaje': percent
})
# Ordenar por porcentaje de tiempo
stats.sort(key=lambda x: x['porcentaje'], reverse=True)
return stats, total_time
def reset(self):
"""Reiniciar las estadísticas"""
self.function_times.clear()
self.call_counts.clear()
profiler = Profiler()
class VideoProcessor:
def __init__(self):
self.cap = None
self.total_frames = 0
self.fps = 0
self.target_fps = 10
self.driver_crop_type = "Verstappen 2025" # Default driver crop type
self.load_crop_variables(self.driver_crop_type)
#self.yolo_model = YOLO("models/best.pt")
self.model = ort.InferenceSession(Path(BASE_DIR) / "models" / "best-224.onnx")
self.input_shape = (224, 224) # Match imgsz=224 from your original code
self.conf_thres = 0.5 # Confidence threshold
self.iou_thres = 0.5 # IoU threshold for NMS
self.frame_count = 0
self.mode = "Default" # Default to False, can be set later
self.video_name = "no_name"
self.frame_cache = OrderedDict()
self.frame_cache_size = 50 # Reduced size to conserve memory
self.last_position = -1
self.frames_list_end = {}
self.frames_list_start = {}
def clear_cache(self):
"""Clear the frame cache to free memory."""
self.frame_cache.clear()
@profiler.track_time
def load_crop_variables(self,driver_crop_type):
"""
Cargar variables de recorte según el tipo de conductor
"""
driver_config = {
"Albon 2024": {
"starty": 0.55,
"axes": 0.39,
"y_start": 0.53,
"x_center": 0.59
},
"Albon 2025": {
"starty": 0.67,
"axes": 0.42,
"y_start": 0.53,
"x_center": 0.59
},
"Alonso 2024": {
"starty": 0.5,
"axes": 0.29,
"y_start": 0.53,
"x_center": 0.56
},
"Alonso 2025": {
"starty": 0.8,
"axes": 0.5,
"y_start": 0.53,
"x_center": 0.572
},
"Bortoleto 2025": {
"starty": 0.6,
"axes": 0.4,
"y_start": 0.53,
"x_center": 0.572
},
"bottas": {
"starty": 0.67,
"axes": 0.43,
"y_start": 0.53,
"x_center": 0.574
},
"colapinto": {
"starty": 0.52,
"axes": 0.33,
"y_start": 0.53,
"x_center": 0.594
},
"Colapinto 2025": {
"starty": 0.54,
"axes": 0.4,
"y_start": 0.53,
"x_center": 0.58
},
"Gasly 2025": {
"starty": 0.57,
"axes": 0.35,
"y_start": 0.53,
"x_center": 0.58
},
"Hulk 2025": {
"starty": 0.73,
"axes": 0.3,
"y_start": 0.53,
"x_center": 0.548
},
"Lawson 2025": {
"starty": 0.68,
"axes": 0.42,
"y_start": 0.53,
"x_center": 0.555
},
"Ocon 2025": {
"starty": 0.65,
"axes": 0.42,
"y_start": 0.53,
"x_center": 0.585
},
"Sainz 2025": {
"starty": 0.77,
"axes": 0.42,
"y_start": 0.53,
"x_center": 0.57
},
"Stroll 2025": {
"starty": 0.6,
"axes": 0.45,
"y_start": 0.53,
"x_center": 0.565
},
"Bearman 2025": {
"starty": 0.72,
"axes": 0.45,
"y_start": 0.53,
"x_center": 0.58
},
"Hadjar 2025": {
"starty": 0.7,
"axes": 0.42,
"y_start": 0.53,
"x_center": 0.57
},
"hamilton-arabia": {
"starty": 0.908,
"axes": 0.4,
"y_start": 0.53,
"x_center": 0.554
},
"Hamilton 2025": {
"starty": 0.59,
"axes": 0.4,
"y_start": 0.53,
"x_center": 0.573
},
"hamilton-texas": {
"starty": 0.7,
"axes": 0.38,
"y_start": 0.53,
"x_center": 0.6
},
"leclerc-china": {
"starty": 0.6,
"axes": 0.36,
"y_start": 0.53,
"x_center": 0.58
},
"Leclerc 2025": {
"starty": 0.65,
"axes": 0.45,
"y_start": 0.53,
"x_center": 0.575
},
"magnussen": {
"starty": 0.6,
"axes": 0.34,
"y_start": 0.53,
"x_center": 0.58
},
"norris-arabia": {
"starty": 0.7,
"axes": 0.3,
"y_start": 0.53,
"x_center": 0.58
},
"norris-texas": {
"starty": 0.7,
"axes": 0.3,
"y_start": 0.53,
"x_center": 0.58
},
"Norris 2025": {
"starty": 0.79,
"axes": 0.6,
"y_start": 0.53,
"x_center": 0.571,
"helmet_height_ratio": 0.5
},
"ocon": {
"starty": 0.75,
"axes": 0.35,
"y_start": 0.53,
"x_center": 0.555
},
"piastri-azerbaiya": {
"starty": 0.65,
"axes": 0.34,
"y_start": 0.53,
"x_center": 0.549
},
"piastri-singapure": {
"starty": 0.65,
"axes": 0.34,
"y_start": 0.53,
"x_center": 0.549
},
'Piastri 2025': {
"starty": 0.93,
"axes": 0.59,
"y_start": 0.53,
"x_center": 0.573,
"helmet_height_ratio": 0.3
},
"russel-singapure": {
"starty": 0.63,
"axes": 0.44,
"y_start": 0.53,
"x_center": 0.56
},
"Russell 2025": {
"starty": 0.95,
"axes": 0.65,
"y_start": 0.53,
"x_center": 0.574,
"helmet_height_ratio": 0.35
},
"sainz": {
"starty": 0.57,
"axes": 0.32,
"y_start": 0.53,
"x_center": 0.59
},
"Tsunoda 2025":{
"starty": 0.92,
"axes": 0.55,
"y_start": 0.53,
"x_center": 0.58,
"helmet_height_ratio": 0.25
},
"verstappen_china": {
"starty": 0.7,
"axes": 0.42,
"y_start": 0.53,
"x_center": 0.57
},
"Verstappen 2025": {
"starty": 0.7,
"axes": 0.42,
"y_start": 0.53,
"x_center": 0.57,
"helmet_height_ratio": 0.4
},
"vertappen": {
"starty": 0.7,
"axes": 0.42,
"y_start": 0.53,
"x_center": 0.57
},
"verstappen-arabia": {
"starty": 0.95,
"axes": 0.4,
"y_start": 0.53,
"x_center": 0.565
},
"yuki": {
"starty": 0.64,
"axes": 0.37,
"y_start": 0.53,
"x_center": 0.585
},
"Antonelli 2025":
{
"starty": 0.97,
"axes": 0.65,
"y_start": 0.53,
"x_center": 0.595,
"helmet_height_ratio": 0.5
}}
#print(f"Driver crop type: {self.driver_crop_type}")
self.driver_crop_type = driver_crop_type
self.starty = driver_config[self.driver_crop_type]["starty"]
self.axes = driver_config[self.driver_crop_type]["axes"]
self.y_start = driver_config[self.driver_crop_type]["y_start"]
self.x_center = driver_config[self.driver_crop_type]["x_center"]
self.helmet_height_ratio = driver_config[self.driver_crop_type]["helmet_height_ratio"] if "helmet_height_ratio" in driver_config[self.driver_crop_type] else 0.5
def clean_up(self):
"""Release video capture and clear cache."""
self.clear_cache()
self.frames_list_start = {}
self.frames_list_end = {}
self.video_path = None
self.frame_count = 0
print("VideoProcessor cleaned up.")
@profiler.track_time
def load_video(self, video_file) -> bool:
"""Load video file and get basic information"""
tfile = tempfile.NamedTemporaryFile(delete=True)
tfile.write(video_file.read())
# Guardar ruta para posibles reinicios
self.video_path = tfile.name
# Obtener solo el nombre sin extensión (opcional)
self.video_name = os.path.splitext(os.path.basename(self.video_path))[0]
self.cap = cv2.VideoCapture(tfile.name)
self.total_frames = int(self.cap.get(cv2.CAP_PROP_FRAME_COUNT))
self.fps = int(self.cap.get(cv2.CAP_PROP_FPS))
#print(f"FPS: {self.fps}")
#print(f"Total frames: {self.total_frames}")
#self.frames_list_start = [None] * self.total_frames # prealocamos
#self.frames_list_end = [None] * self.total_frames # prealocamos
self.start_frame_min = 0
self.start_frame_max = min(100,int(self.total_frames * 0.1)) # 10% del total
if self.total_frames > 500:
self.end_frame_min = int(self.total_frames-100) # 90% del total
else:
self.end_frame_min = int(self.total_frames * 0.9)
self.end_frame_max = self.total_frames - 1
i = 0
#print(len(self.frames_list_start), len(self.frames_list_end))
if self.frames_list_end == {}:
current_frame_num = self.start_frame_min
cap_thread = cv2.VideoCapture(self.video_path)
cap_thread.set(cv2.CAP_PROP_POS_FRAMES, float(self.start_frame_min))
while current_frame_num <= self.start_frame_max:
ret, frame = cap_thread.read()
if not ret:
# print(f"Advertencia: No se pudo leer el frame {current_frame_num} de {video_path}.")
break
processed_frame = cv2.cvtColor(cv2.resize(frame, (256, 144), interpolation=cv2.INTER_LINEAR), cv2.COLOR_BGR2GRAY)
self.frames_list_start[current_frame_num] = processed_frame
current_frame_num += 1
cap_thread.release()
current_frame_num = self.end_frame_min
cap_thread = cv2.VideoCapture(self.video_path)
cap_thread.set(cv2.CAP_PROP_POS_FRAMES, float(self.end_frame_min))
while current_frame_num <= self.end_frame_max:
ret, frame = cap_thread.read()
if not ret:
# print(f"Advertencia: No se pudo leer el frame {current_frame_num} de {video_path}.")
break
processed_frame = cv2.cvtColor(cv2.resize(frame, (256, 144), interpolation=cv2.INTER_LINEAR), cv2.COLOR_BGR2GRAY)
self.frames_list_end[current_frame_num] = processed_frame
current_frame_num += 1
cap_thread.release()
'''while True:
ret, frame = self.cap.read()
if i >= start_frame_min and i <= start_frame_max:
self.frames_list_start[i] = cv2.cvtColor(cv2.resize(frame, (426,240), interpolation=cv2.INTER_LINEAR),cv2.COLOR_BGR2GRAY)
if i >= end_frame_min and i <= end_frame_max:
self.frames_list_end[i] = cv2.cvtColor(cv2.resize(frame, (426,240), interpolation=cv2.INTER_LINEAR),cv2.COLOR_BGR2GRAY)
if not ret or i >= self.total_frames:
break
i += 1'''
self.cap = cv2.VideoCapture(tfile.name)
return True
def load_video2(self, video_file, output_resolution=(854, 480)) -> bool:
"""
Load video file, resize to 480p, and get basic information.
Args:
video_file: Input video file object
output_resolution: Tuple of (width, height) for resizing (default: 854x480 for 480p)
Returns:
bool: True if successful, False otherwise
"""
try:
# Create temporary file to store the input video
tfile = tempfile.NamedTemporaryFile(delete=False, suffix='.mp4')
tfile.write(video_file.read())
tfile.close() # Close the file to allow VideoCapture to access it
# Store the temporary file path
self.video_path = tfile.name
# Load the video
self.cap = cv2.VideoCapture(tfile.name)
if not self.cap.isOpened():
print("Error: Could not open video file.")
return False
# Get original video properties
self.total_frames = int(self.cap.get(cv2.CAP_PROP_FRAME_COUNT))
self.fps = int(self.cap.get(cv2.CAP_PROP_FPS))
#print(f"FPS: {self.fps}")
#print(f"Total frames: {self.total_frames}")
# Prepare for resizing and saving to a new temporary file
output_path = tempfile.NamedTemporaryFile(delete=False, suffix='.mp4').name
fourcc = cv2.VideoWriter_fourcc(*'mp4v') # Codec for MP4
out = cv2.VideoWriter(output_path, fourcc, self.fps, output_resolution)
# Process each frame
while self.cap.isOpened():
ret, frame = self.cap.read()
if not ret:
break
# Resize frame to 480p
resized_frame = cv2.resize(frame, output_resolution, interpolation=cv2.INTER_AREA)
out.write(resized_frame)
# Release resources
self.cap.release()
out.release()
# Update video path to the resized video
self.video_path = output_path
self.cap = cv2.VideoCapture(self.video_path)
if not self.cap.isOpened():
print("Error: Could not open resized video.")
return False
print(f"Video resized to {output_resolution} and saved to {output_path}")
return True
except Exception as e:
print(f"Error processing video: {str(e)}")
return False
def load_video1(self, video_file) -> bool:
"""Load video file and get basic information"""
with tempfile.TemporaryFile(suffix='.mp4') as tfile:
tfile.write(video_file.read())
tfile.seek(0)
self.video_path = tfile.name # Store for reference
self.cap = cv2.VideoCapture(tfile.name)
if not self.cap.isOpened():
return False
self.total_frames = int(self.cap.get(cv2.CAP_PROP_FRAME_COUNT))
self.fps = int(self.cap.get(cv2.CAP_PROP_FPS))
return True
@profiler.track_time
def get_frame1(self, frame_number: int) -> np.ndarray:
"""
Obtiene un frame específico del video con optimizaciones de rendimiento
Args:
frame_number: Número del frame a obtener
Returns:
Frame como array NumPy (formato RGB) o None si no está disponible
"""
if self.cap is None:
return None
# 1. Inicializar atributos de seguimiento si no existen
if not hasattr(self, 'frame_cache'):
# Usamos un diccionario limitado para caché de frames frecuentes
self.frame_cache = {}
self.frame_cache_size = 100 # Ajustar según memoria disponible
self.last_position = -1 # Para seguimiento de posición
# 2. Consultar caché primero (mejora extrema para frames accedidos repetidamente)
if frame_number in self.frame_cache:
return self.frame_cache[frame_number]
# 3. Optimización para acceso secuencial (evita seeks innecesarios)
if hasattr(self, 'last_position') and frame_number == self.last_position + 1:
# El frame solicitado es el siguiente al último leído
ret, frame = self.cap.read()
if ret:
self.last_position = frame_number
rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
#rgb_frame = frame
# Añadir al caché
self.frame_cache[frame_number] = rgb_frame
# Mantener tamaño del caché
if len(self.frame_cache) > self.frame_cache_size:
# Eliminar el frame más antiguo (menor número)
oldest = min(self.frame_cache.keys())
del self.frame_cache[oldest]
return rgb_frame
# Si falla la lectura, continuar con método directo
# 4. Acceso directo con mecanismo de reintento
for attempt in range(3): # Intentar hasta 3 veces si falla
self.cap.set(cv2.CAP_PROP_POS_FRAMES, frame_number)
ret, frame = self.cap.read()
if ret:
# Actualizar last_position para futuras optimizaciones secuenciales
self.last_position = frame_number
rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
# Añadir al caché
self.frame_cache[frame_number] = rgb_frame
# Mantener tamaño del caché
if len(self.frame_cache) > self.frame_cache_size:
# Eliminar el frame más antiguo (menor número)
oldest = min(self.frame_cache.keys())
del self.frame_cache[oldest]
return rgb_frame
if attempt < 2: # No reintentar en el último intento
# Restaurar el objeto cap en caso de error
# Esto ayuda con formatos de video problemáticos
if hasattr(self, 'video_path') and self.video_path:
self.cap.release()
self.cap = cv2.VideoCapture(self.video_path)
# Si llegamos aquí, todos los intentos fallaron
return None
def get_frame(self, frame_number: int) -> np.ndarray:
if self.cap is None:
return None
'''if frame_number in self.frame_cache:
return self.frame_cache[frame_number]'''
if hasattr(self, 'last_position') and frame_number == self.last_position + 1:
ret, frame = self.cap.read()
if ret:
self.last_position = frame_number
rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
self.frame_cache[frame_number] = rgb_frame
if len(self.frame_cache) > self.frame_cache_size:
self.frame_cache.popitem(last=False) # Remove oldest item
return cv2.resize(rgb_frame, (849, 477))
for attempt in range(3):
self.cap.set(cv2.CAP_PROP_POS_FRAMES, frame_number)
ret, frame = self.cap.read()
if ret:
self.last_position = frame_number
rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
self.frame_cache[frame_number] = rgb_frame
if len(self.frame_cache) > self.frame_cache_size:
self.frame_cache.popitem(last=False)
return cv2.resize(rgb_frame, (854,480), interpolation=cv2.INTER_LINEAR)
if attempt < 2 and hasattr(self, 'video_path') and self.video_path:
self.cap.release()
self.cap = cv2.VideoCapture(self.video_path)
print(f"Error reading frame {frame_number}, retrying...")
return None
def get_frame_example(self, frame_number: int) -> np.ndarray:
"""
Obtiene un frame específico del video con optimizaciones de rendimiento
Args:
frame_number: Número del frame a obtener
Returns:
Frame como array NumPy (formato RGB) o None si no está disponible
"""
if self.cap is None:
return None
print(f"Frame number: {frame_number}")
# 1. Inicializar atributos de seguimiento si no existen
if not hasattr(self, 'frame_cache'):
# Usamos un diccionario limitado para caché de frames frecuentes
self.frame_cache = {}
self.frame_cache_size = 30 # Ajustar según memoria disponible
self.last_position = -1 # Para seguimiento de posición
# 2. Consultar caché primero (mejora extrema para frames accedidos repetidamente)
if frame_number in self.frame_cache:
return self.frame_cache[frame_number]
# 4. Acceso directo con mecanismo de reintento
for attempt in range(3): # Intentar hasta 3 veces si falla
try:
self.cap.set(cv2.CAP_PROP_POS_FRAMES, frame_number)
ret, frame = self.cap.read()
if ret:
# Actualizar last_position para futuras optimizaciones secuenciales
self.last_position = frame_number
rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
# Añadir al caché
self.frame_cache[frame_number] = rgb_frame
# Mantener tamaño del caché
if len(self.frame_cache) > self.frame_cache_size:
# Eliminar el frame más antiguo (menor número)
oldest = min(self.frame_cache.keys())
del self.frame_cache[oldest]
return rgb_frame
except:
pass
if attempt < 2: # No reintentar en el último intento
# Restaurar el objeto cap en caso de error
# Esto ayuda con formatos de video problemáticos
if hasattr(self, 'video_path') and self.video_path:
self.cap.release()
self.cap = cv2.VideoCapture(self.video_path)
# Si llegamos aquí, todos los intentos fallaron
return None
@profiler.track_time
def mask_helmet_yolo(self, color_image: np.ndarray, helmet_height_ratio: float = 0.3, prev_mask: np.ndarray = None) -> Tuple[np.ndarray, np.ndarray]:
"""
Usa YOLOv8 para segmentar el casco y lo pinta de verde.
Si se proporciona una máscara previa, la reutiliza.
Args:
color_image: Imagen en color (BGR).
helmet_height_ratio: Proporción de la imagen a considerar como región del casco (parte inferior).
prev_mask: Máscara previa para reutilizar (opcional).
Returns:
Tuple: (Imagen con la región del casco pintada de verde, Máscara generada o reutilizada).
"""
# Copia de la imagen
result_1 = color_image.copy()
height, width = color_image.shape[:2]
# Si hay una máscara previa, reutilizarla
if prev_mask is not None:
mask_final = prev_mask
else:
# Convertir la imagen a RGB (YOLOv8 espera imágenes en RGB)
image_rgb = cv2.cvtColor(color_image, cv2.COLOR_BGR2RGB)
# Realizar la predicción con YOLOv8
results = self.yolo_model(image_rgb, conf=0.2, iou=0.5,imgsz=224) # Ajusta conf e iou según necesidad
# Inicializar máscara vacía
mask_final = np.zeros((height, width), dtype=np.uint8)
# Procesar los resultados de segmentación
if results[0].masks is not None:
for result in results:
masks = result.masks.data.cpu().numpy() # Máscaras de segmentación
boxes = result.boxes.xyxy.cpu().numpy() # Cajas delimitadoras
classes = result.boxes.cls.cpu().numpy() # Clases predichas
# Filtrar para la clase del casco (asumiendo que es la clase 0 o 'helmet')
# Si usas un modelo pre-entrenado en COCO, la clase 'helmet' no existe, usa 'person' (clase 0) y ROI
for i, cls in enumerate(classes):
# Ajusta según la clase de tu modelo. Ejemplo: clase 0 para 'helmet' en modelo personalizado
if int(cls) == 0: # Cambia según el índice de clase de tu modelo
# Obtener la máscara correspondiente
'''mask = masks[i]
# Redimensionar la máscara al tamaño de la imagen
mask = cv2.resize(mask, (width, height), interpolation=cv2.INTER_NEAREST)
mask = (mask > 0).astype(np.uint8) * 255 # Convertir a binario (0 o 255)
# Opcional: Filtrar usando la ROI inferior para enfocarse en el casco
roi_height = int(height * helmet_height_ratio)
roi_mask = np.zeros((height, width), dtype=np.uint8)
roi_mask[height - roi_height:, :] = 255 # Parte inferior
mask = cv2.bitwise_and(mask, roi_mask)
# Combinar máscaras si hay múltiples detecciones
mask_final = cv2.bitwise_or(mask_final, mask)'''
mask = masks[i]
mask = cv2.resize(mask, (width, height), interpolation=cv2.INTER_NEAREST)
mask = (mask > 0).astype(np.uint8) * 255
mask_final = cv2.bitwise_or(mask_final, mask)
# Refinar la máscara con operaciones morfológicas
kernel = np.ones((5, 5), np.uint8)
mask_final = cv2.erode(mask_final, kernel, iterations=1) # Eliminar ruido
mask_final = cv2.dilate(mask_final, kernel, iterations=3) # Expandir para cubrir el casco
else:
# Si no se detecta casco, devolver la imagen sin cambios y máscara vacía
print("No helmet detected in this frame.")
return result_1, mask_final
# Crear una imagen verde del mismo tamaño que la imagen original
green_color = np.zeros_like(color_image) # Crear una imagen vacía
green_color[:, :] = [125, 125, 125] # Color verde en BGR (0, 255, 0)
# Aplicar la máscara para pintar solo la región del casco
masked_green = cv2.bitwise_and(green_color, green_color, mask=mask_final)
# Crear máscara invertida para conservar el resto de la imagen
mask_inv = cv2.bitwise_not(mask_final)
# Combinar la región verde con el resto de la imagen original
result_original = cv2.bitwise_and(result_1, result_1, mask=mask_inv)
result = cv2.add(masked_green, result_original)
return result, mask_final
def mask_helmet(self, img):
"""Mask the helmet region using SAM and paint it green."""
print("Processing frame...")
img = cv2.resize(img, (224, 224), interpolation=cv2.INTER_LINEAR)
height, width = img.shape[:2]
img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
outputs = self.model.run(None, {"images":preprocess_image_tensor(img)})
flag,result = postprocess_outputs(outputs, height, width)
# Procesar los resultados de segmentación
if flag is True:
result_image = img.copy()
overlay = np.zeros_like(img, dtype=np.uint8)
color = (125, 125, 125, 255) # RGBA color for the helmet
# Extract RGB and alpha from color
fill_color = color[:3] # (R, G, B) = (125, 125, 125)
alpha = color[3] / 255.0 # Normalize alpha to [0, 1]
for obj in result:
x1, y1, x2, y2, _, _, _, polygon = obj
# Translate polygon coordinates relative to (x1, y1)
polygon = [(round(x1 + point[0]), round(y1 + point[1])) for point in polygon]
# Convert polygon to format required by cv2.fillPoly
pts = np.array(polygon, dtype=np.int32).reshape((-1, 1, 2))
# Draw filled polygon on overlay
cv2.fillPoly(overlay, [pts], fill_color)
# Create alpha mask for blending
mask = np.any(overlay != 0, axis=2).astype(np.float32)
alpha_mask = mask * alpha
for c in range(3): # For each color channel
result_image[:, :, c] = (1 - alpha_mask) * result_image[:, :, c] + alpha_mask * overlay[:, :, c]
return result_image
else:
# Si no se detecta casco, devolver la imagen sin cambios y máscara vacía
print("No helmet detected in this frame.")
return img
def extract_frames1(self, start_frame: int, end_frame: int, fps_target: int = 10) -> List[np.ndarray]:
"""
Extract frames con procesamiento vectorizado para mayor rendimiento, actualizando la máscara cada 10 frames.
"""
frames, crude_frames = [], []
# Calculate the total number of frames in the selection
total_frames_selection = end_frame - start_frame + 1
# Calculate the duration of the selection in seconds
selection_duration = total_frames_selection / self.fps
# Calculate total frames to extract based on target fps
frames_to_extract = int(selection_duration * fps_target)
frames_to_extract = max(1, frames_to_extract)
# Vectorizar cálculo de índices
if frames_to_extract < total_frames_selection:
frame_indices = np.linspace(start_frame, end_frame, frames_to_extract, dtype=int)
else:
frame_indices = np.arange(start_frame, end_frame + 1)
counter = 0
# Procesamiento por lotes para reducir sobrecarga de función
BATCH_SIZE =150
last_mask = None # Almacenar la última máscara generada
for i in range(0, len(frame_indices), BATCH_SIZE):
batch_indices = frame_indices[i:i+BATCH_SIZE]
batch_frames = []
# Extract the frames in the current batch
for frame_num in batch_indices:
frame = self.get_frame(frame_num)
if frame is not None:
batch_frames.append((frame_num, frame))
# Process the batch of frames
if batch_frames:
for idx, (frame_num, frame) in enumerate(batch_frames):
cropped = self.crop_frame(frame)
result = self.mask_helmet(cropped)
clahe_image = self.apply_clahe(result)
threshold_image = self.apply_treshold(clahe_image)
frames.append(threshold_image)
return frames, crude_frames
def extract_frames(self, start_frame: int, end_frame: int, fps_target: int = 10) -> List[np.ndarray]:
frames, crude_frames = [], []
total_frames_selection = end_frame - start_frame + 1
selection_duration = total_frames_selection / self.fps
frames_to_extract = max(1, int(selection_duration * fps_target))
frame_indices = np.linspace(start_frame, end_frame, frames_to_extract, dtype=int) if frames_to_extract < total_frames_selection else np.arange(start_frame, end_frame + 1)
BATCH_SIZE = 64
def process_frame(frame_data):
frame_num, frame = frame_data
if frame is None:
return None
cropped = self.crop_frame(frame)
result = self.mask_helmet(cropped)
clahe_image = self.apply_clahe(result)
threshold_image = self.apply_treshold(clahe_image)
return threshold_image
for i in range(0, len(frame_indices), BATCH_SIZE):
batch_indices = frame_indices[i:i+BATCH_SIZE]
batch_frames = [(idx, self.get_frame(idx)) for idx in batch_indices]
with ThreadPoolExecutor(max_workers=2) as executor: # Adjust max_workers based on CPU cores
batch_results = list(executor.map(process_frame, [f for f in batch_frames if f[1] is not None]))
frames.extend([r for r in batch_results if r is not None])
return frames, crude_frames
@profiler.track_time
def crop_frame(self,image):
if image is None:
print(f"Error loading")
return None
height, width, _ = image.shape
# Use the bottom half of the image
#y_start = int(height * 0.53)
# 55% of the height
y_start = int(height * self.y_start) # 55% of the height
crop_height = height - y_start # height of bottom half
square_size = crop_height # base crop height
# Increase width by 30%: new_width equals 130% of square_size
new_width = square_size
# Shift the crop center 20% to the right.
# Calculate the desired center position.
#x_center = int(width * 0.57)
x_center = int(width * self.x_center)
x_start = max(0, x_center - new_width // 2)
x_end = x_start + new_width
# Adapt the crop if x_end exceeds the image width
if x_end > width:
x_end = width
x_start = max(0, width - new_width)
# Crop the image: bottom half in height and new_width in horizontal dimension
cropped_image = image[y_start:y_start+crop_height, x_start:x_end]
return cropped_image
def crop_frame_example(self,image):
if image is None:
print(f"Error loading")
return None
height, width, _ = image.shape
# Use the bottom half of the image
#y_start = int(height * 0.53)
# 55% of the height
y_start = int(height * self.y_start) # 55% of the height
crop_height = height - y_start # height of bottom half
square_size = crop_height # base crop height
# Increase width by 30%: new_width equals 130% of square_size
new_width = square_size
# Shift the crop center 20% to the right.
# Calculate the desired center position.
#x_center = int(width * 0.57)
x_center = int(width * self.x_center)
x_start = max(0, x_center - new_width // 2)
x_end = x_start + new_width
# Adapt the crop if x_end exceeds the image width
if x_end > width:
x_end = width
x_start = max(0, width - new_width)
# Crop the image: bottom half in height and new_width in horizontal dimension
cropped_image = image[y_start:y_start+crop_height, x_start:x_end]
cropped_image = recortar_imagen(cropped_image,self.starty, self.axes)
cropped_image = recortar_imagen_again(cropped_image,self.starty, self.axes)
#print(self.starty, self.axes, self.y_start, self.x_center)
return cropped_image
@profiler.track_time
def apply_clahe(self, image):
image = recortar_imagen(image,self.starty, self.axes)
if self.mode == "Default":
clahe_image = cv2.createCLAHE(clipLimit=5.0, tileGridSize=(3, 3)).apply(cv2.cvtColor(image, cv2.COLOR_BGR2GRAY))
elif self.mode == "Low ilumination":
clahe_image = cv2.createCLAHE(clipLimit=7.0, tileGridSize=(3, 3)).apply(cv2.cvtColor(image, cv2.COLOR_BGR2GRAY))
#clahe_image = cv2.equalizeHist(image)
return clahe_image
@profiler.track_time
def apply_treshold(self, image):
#try:
# Process the image with adaptive edge detection (target 6% de bordes)
'''_, edges, _, config = adaptive_edge_detection(
image,
min_edge_percentage=3,
max_edge_percentage=6,
target_percentage=5,
max_attempts=5
)'''
percentage = calculate_black_pixels_percentage(image)
_, edges, _, config = adaptive_edge_detection(
image,
min_edge_percentage=percentage,
max_edge_percentage=percentage,
target_percentage=percentage,
max_attempts=1,
mode = self.mode
)
# Save the edge image
if edges is not None:
edges = recortar_imagen_again(edges,self.starty, self.axes)
return edges
def __del__(self):
if self.cap is not None:
self.cap.release()
self.clear_cache() # Ensure cache is cleared on object deletion