|
|
import cv2 |
|
|
import numpy as np |
|
|
from typing import List, Tuple |
|
|
import tempfile |
|
|
import time |
|
|
import functools |
|
|
from collections import defaultdict |
|
|
import onnxruntime as ort |
|
|
from utils.model_handler import ModelHandler |
|
|
from utils.helper import ( |
|
|
preprocess_image_tensor, |
|
|
postprocess_outputs, |
|
|
recortar_imagen, |
|
|
recortar_imagen_again, |
|
|
calculate_black_pixels_percentage, |
|
|
adaptive_edge_detection, |
|
|
|
|
|
) |
|
|
from collections import OrderedDict |
|
|
from concurrent.futures import ThreadPoolExecutor |
|
|
from pathlib import Path |
|
|
from utils.helper import BASE_DIR |
|
|
import os |
|
|
|
|
|
class Profiler: |
|
|
"""Clase para trackear el tiempo de ejecución de las funciones""" |
|
|
|
|
|
_instance = None |
|
|
|
|
|
def __new__(cls): |
|
|
if cls._instance is None: |
|
|
cls._instance = super(Profiler, cls).__new__(cls) |
|
|
cls._instance.function_times = defaultdict(list) |
|
|
cls._instance.call_counts = defaultdict(int) |
|
|
return cls._instance |
|
|
|
|
|
def track_time(self, func): |
|
|
@functools.wraps(func) |
|
|
def wrapper(*args, **kwargs): |
|
|
start_time = time.time() |
|
|
result = func(*args, **kwargs) |
|
|
end_time = time.time() |
|
|
elapsed = end_time - start_time |
|
|
|
|
|
self.function_times[func.__name__].append(elapsed) |
|
|
self.call_counts[func.__name__] += 1 |
|
|
|
|
|
return result |
|
|
return wrapper |
|
|
|
|
|
def print_stats(self): |
|
|
print("\n===== FUNCIÓN TIMING STATS =====") |
|
|
print(f"{'FUNCIÓN':<30} {'LLAMADAS':<10} {'TOTAL (s)':<15} {'PROMEDIO (s)':<15} {'% TIEMPO':<10}") |
|
|
|
|
|
total_time = sum(sum(times) for times in self.function_times.values()) |
|
|
|
|
|
|
|
|
sorted_funcs = sorted( |
|
|
self.function_times.items(), |
|
|
key=lambda x: sum(x[1]), |
|
|
reverse=True |
|
|
) |
|
|
|
|
|
for func_name, times in sorted_funcs: |
|
|
total = sum(times) |
|
|
avg = total / len(times) if times else 0 |
|
|
calls = self.call_counts[func_name] |
|
|
percent = (total / total_time * 100) if total_time > 0 else 0 |
|
|
|
|
|
print(f"{func_name:<30} {calls:<10} {total:<15.4f} {avg:<15.4f} {percent:<10.2f}%") |
|
|
|
|
|
print(f"\nTiempo total de procesamiento: {total_time:.4f} segundos") |
|
|
print("================================") |
|
|
|
|
|
def get_stats_dict(self): |
|
|
"""Devuelve las estadísticas como un diccionario para mostrar en Streamlit""" |
|
|
stats = [] |
|
|
total_time = sum(sum(times) for times in self.function_times.values()) |
|
|
|
|
|
for func_name, times in self.function_times.items(): |
|
|
total = sum(times) |
|
|
avg = total / len(times) if times else 0 |
|
|
calls = self.call_counts[func_name] |
|
|
percent = (total / total_time * 100) if total_time > 0 else 0 |
|
|
|
|
|
stats.append({ |
|
|
'función': func_name, |
|
|
'llamadas': calls, |
|
|
'tiempo_total': total, |
|
|
'tiempo_promedio': avg, |
|
|
'porcentaje': percent |
|
|
}) |
|
|
|
|
|
|
|
|
stats.sort(key=lambda x: x['porcentaje'], reverse=True) |
|
|
return stats, total_time |
|
|
|
|
|
def reset(self): |
|
|
"""Reiniciar las estadísticas""" |
|
|
self.function_times.clear() |
|
|
self.call_counts.clear() |
|
|
|
|
|
profiler = Profiler() |
|
|
|
|
|
|
|
|
class VideoProcessor: |
|
|
def __init__(self): |
|
|
self.cap = None |
|
|
self.total_frames = 0 |
|
|
self.fps = 0 |
|
|
self.target_fps = 10 |
|
|
self.driver_crop_type = "Verstappen 2025" |
|
|
self.load_crop_variables(self.driver_crop_type) |
|
|
|
|
|
self.model = ort.InferenceSession(Path(BASE_DIR) / "models" / "best-224.onnx") |
|
|
self.input_shape = (224, 224) |
|
|
self.conf_thres = 0.5 |
|
|
self.iou_thres = 0.5 |
|
|
self.frame_count = 0 |
|
|
self.mode = "Default" |
|
|
self.video_name = "no_name" |
|
|
|
|
|
|
|
|
self.frame_cache = OrderedDict() |
|
|
self.frame_cache_size = 50 |
|
|
self.last_position = -1 |
|
|
|
|
|
self.frames_list_end = {} |
|
|
self.frames_list_start = {} |
|
|
|
|
|
def clear_cache(self): |
|
|
"""Clear the frame cache to free memory.""" |
|
|
self.frame_cache.clear() |
|
|
|
|
|
@profiler.track_time |
|
|
def load_crop_variables(self,driver_crop_type): |
|
|
""" |
|
|
Cargar variables de recorte según el tipo de conductor |
|
|
""" |
|
|
driver_config = { |
|
|
"Albon 2024": { |
|
|
"starty": 0.55, |
|
|
"axes": 0.39, |
|
|
"y_start": 0.53, |
|
|
"x_center": 0.59 |
|
|
}, |
|
|
"Albon 2025": { |
|
|
"starty": 0.67, |
|
|
"axes": 0.42, |
|
|
"y_start": 0.53, |
|
|
"x_center": 0.59 |
|
|
}, |
|
|
"Alonso 2024": { |
|
|
"starty": 0.5, |
|
|
"axes": 0.29, |
|
|
"y_start": 0.53, |
|
|
"x_center": 0.56 |
|
|
}, |
|
|
"Alonso 2025": { |
|
|
"starty": 0.8, |
|
|
"axes": 0.5, |
|
|
"y_start": 0.53, |
|
|
"x_center": 0.572 |
|
|
}, |
|
|
"Bortoleto 2025": { |
|
|
"starty": 0.6, |
|
|
"axes": 0.4, |
|
|
"y_start": 0.53, |
|
|
"x_center": 0.572 |
|
|
}, |
|
|
"bottas": { |
|
|
"starty": 0.67, |
|
|
"axes": 0.43, |
|
|
"y_start": 0.53, |
|
|
"x_center": 0.574 |
|
|
}, |
|
|
"colapinto": { |
|
|
"starty": 0.52, |
|
|
"axes": 0.33, |
|
|
"y_start": 0.53, |
|
|
"x_center": 0.594 |
|
|
}, |
|
|
"Colapinto 2025": { |
|
|
"starty": 0.54, |
|
|
"axes": 0.4, |
|
|
"y_start": 0.53, |
|
|
"x_center": 0.58 |
|
|
}, |
|
|
"Gasly 2025": { |
|
|
"starty": 0.57, |
|
|
"axes": 0.35, |
|
|
"y_start": 0.53, |
|
|
"x_center": 0.58 |
|
|
}, |
|
|
"Hulk 2025": { |
|
|
"starty": 0.73, |
|
|
"axes": 0.3, |
|
|
"y_start": 0.53, |
|
|
"x_center": 0.548 |
|
|
}, |
|
|
"Lawson 2025": { |
|
|
"starty": 0.68, |
|
|
"axes": 0.42, |
|
|
"y_start": 0.53, |
|
|
"x_center": 0.555 |
|
|
}, |
|
|
"Ocon 2025": { |
|
|
"starty": 0.65, |
|
|
"axes": 0.42, |
|
|
"y_start": 0.53, |
|
|
"x_center": 0.585 |
|
|
}, |
|
|
"Sainz 2025": { |
|
|
"starty": 0.77, |
|
|
"axes": 0.42, |
|
|
"y_start": 0.53, |
|
|
"x_center": 0.57 |
|
|
}, |
|
|
"Stroll 2025": { |
|
|
"starty": 0.6, |
|
|
"axes": 0.45, |
|
|
"y_start": 0.53, |
|
|
"x_center": 0.565 |
|
|
}, |
|
|
"Bearman 2025": { |
|
|
"starty": 0.72, |
|
|
"axes": 0.45, |
|
|
"y_start": 0.53, |
|
|
"x_center": 0.58 |
|
|
}, |
|
|
"Hadjar 2025": { |
|
|
"starty": 0.7, |
|
|
"axes": 0.42, |
|
|
"y_start": 0.53, |
|
|
"x_center": 0.57 |
|
|
}, |
|
|
"hamilton-arabia": { |
|
|
"starty": 0.908, |
|
|
"axes": 0.4, |
|
|
"y_start": 0.53, |
|
|
"x_center": 0.554 |
|
|
}, |
|
|
"Hamilton 2025": { |
|
|
"starty": 0.59, |
|
|
"axes": 0.4, |
|
|
"y_start": 0.53, |
|
|
"x_center": 0.573 |
|
|
}, |
|
|
|
|
|
"hamilton-texas": { |
|
|
"starty": 0.7, |
|
|
"axes": 0.38, |
|
|
"y_start": 0.53, |
|
|
"x_center": 0.6 |
|
|
}, |
|
|
"leclerc-china": { |
|
|
"starty": 0.6, |
|
|
"axes": 0.36, |
|
|
"y_start": 0.53, |
|
|
"x_center": 0.58 |
|
|
}, |
|
|
|
|
|
"Leclerc 2025": { |
|
|
"starty": 0.65, |
|
|
"axes": 0.45, |
|
|
"y_start": 0.53, |
|
|
"x_center": 0.575 |
|
|
}, |
|
|
"magnussen": { |
|
|
"starty": 0.6, |
|
|
"axes": 0.34, |
|
|
"y_start": 0.53, |
|
|
"x_center": 0.58 |
|
|
}, |
|
|
"norris-arabia": { |
|
|
"starty": 0.7, |
|
|
"axes": 0.3, |
|
|
"y_start": 0.53, |
|
|
"x_center": 0.58 |
|
|
}, |
|
|
"norris-texas": { |
|
|
"starty": 0.7, |
|
|
"axes": 0.3, |
|
|
"y_start": 0.53, |
|
|
"x_center": 0.58 |
|
|
}, |
|
|
"Norris 2025": { |
|
|
"starty": 0.79, |
|
|
"axes": 0.6, |
|
|
"y_start": 0.53, |
|
|
"x_center": 0.571, |
|
|
"helmet_height_ratio": 0.5 |
|
|
}, |
|
|
"ocon": { |
|
|
"starty": 0.75, |
|
|
"axes": 0.35, |
|
|
"y_start": 0.53, |
|
|
"x_center": 0.555 |
|
|
}, |
|
|
"piastri-azerbaiya": { |
|
|
"starty": 0.65, |
|
|
"axes": 0.34, |
|
|
"y_start": 0.53, |
|
|
"x_center": 0.549 |
|
|
}, |
|
|
"piastri-singapure": { |
|
|
"starty": 0.65, |
|
|
"axes": 0.34, |
|
|
"y_start": 0.53, |
|
|
"x_center": 0.549 |
|
|
}, |
|
|
'Piastri 2025': { |
|
|
"starty": 0.93, |
|
|
"axes": 0.59, |
|
|
"y_start": 0.53, |
|
|
"x_center": 0.573, |
|
|
"helmet_height_ratio": 0.3 |
|
|
}, |
|
|
"russel-singapure": { |
|
|
"starty": 0.63, |
|
|
"axes": 0.44, |
|
|
"y_start": 0.53, |
|
|
"x_center": 0.56 |
|
|
}, |
|
|
"Russell 2025": { |
|
|
"starty": 0.95, |
|
|
"axes": 0.65, |
|
|
"y_start": 0.53, |
|
|
"x_center": 0.574, |
|
|
"helmet_height_ratio": 0.35 |
|
|
}, |
|
|
"sainz": { |
|
|
"starty": 0.57, |
|
|
"axes": 0.32, |
|
|
"y_start": 0.53, |
|
|
"x_center": 0.59 |
|
|
}, |
|
|
|
|
|
|
|
|
"Tsunoda 2025":{ |
|
|
"starty": 0.92, |
|
|
"axes": 0.55, |
|
|
"y_start": 0.53, |
|
|
"x_center": 0.58, |
|
|
"helmet_height_ratio": 0.25 |
|
|
}, |
|
|
"verstappen_china": { |
|
|
"starty": 0.7, |
|
|
"axes": 0.42, |
|
|
"y_start": 0.53, |
|
|
"x_center": 0.57 |
|
|
}, |
|
|
"Verstappen 2025": { |
|
|
"starty": 0.7, |
|
|
"axes": 0.42, |
|
|
"y_start": 0.53, |
|
|
"x_center": 0.57, |
|
|
"helmet_height_ratio": 0.4 |
|
|
}, |
|
|
"vertappen": { |
|
|
"starty": 0.7, |
|
|
"axes": 0.42, |
|
|
"y_start": 0.53, |
|
|
"x_center": 0.57 |
|
|
}, |
|
|
"verstappen-arabia": { |
|
|
"starty": 0.95, |
|
|
"axes": 0.4, |
|
|
"y_start": 0.53, |
|
|
"x_center": 0.565 |
|
|
}, |
|
|
"yuki": { |
|
|
"starty": 0.64, |
|
|
"axes": 0.37, |
|
|
"y_start": 0.53, |
|
|
"x_center": 0.585 |
|
|
}, |
|
|
"Antonelli 2025": |
|
|
{ |
|
|
"starty": 0.97, |
|
|
"axes": 0.65, |
|
|
"y_start": 0.53, |
|
|
"x_center": 0.595, |
|
|
"helmet_height_ratio": 0.5 |
|
|
}} |
|
|
|
|
|
|
|
|
self.driver_crop_type = driver_crop_type |
|
|
self.starty = driver_config[self.driver_crop_type]["starty"] |
|
|
self.axes = driver_config[self.driver_crop_type]["axes"] |
|
|
|
|
|
self.y_start = driver_config[self.driver_crop_type]["y_start"] |
|
|
self.x_center = driver_config[self.driver_crop_type]["x_center"] |
|
|
self.helmet_height_ratio = driver_config[self.driver_crop_type]["helmet_height_ratio"] if "helmet_height_ratio" in driver_config[self.driver_crop_type] else 0.5 |
|
|
|
|
|
def clean_up(self): |
|
|
"""Release video capture and clear cache.""" |
|
|
|
|
|
self.clear_cache() |
|
|
self.frames_list_start = {} |
|
|
self.frames_list_end = {} |
|
|
self.video_path = None |
|
|
self.frame_count = 0 |
|
|
print("VideoProcessor cleaned up.") |
|
|
|
|
|
@profiler.track_time |
|
|
def load_video(self, video_file) -> bool: |
|
|
"""Load video file and get basic information""" |
|
|
tfile = tempfile.NamedTemporaryFile(delete=True) |
|
|
tfile.write(video_file.read()) |
|
|
|
|
|
|
|
|
self.video_path = tfile.name |
|
|
|
|
|
|
|
|
self.video_name = os.path.splitext(os.path.basename(self.video_path))[0] |
|
|
|
|
|
self.cap = cv2.VideoCapture(tfile.name) |
|
|
self.total_frames = int(self.cap.get(cv2.CAP_PROP_FRAME_COUNT)) |
|
|
self.fps = int(self.cap.get(cv2.CAP_PROP_FPS)) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
self.start_frame_min = 0 |
|
|
self.start_frame_max = min(100,int(self.total_frames * 0.1)) |
|
|
|
|
|
if self.total_frames > 500: |
|
|
self.end_frame_min = int(self.total_frames-100) |
|
|
else: |
|
|
self.end_frame_min = int(self.total_frames * 0.9) |
|
|
self.end_frame_max = self.total_frames - 1 |
|
|
i = 0 |
|
|
|
|
|
|
|
|
if self.frames_list_end == {}: |
|
|
|
|
|
|
|
|
|
|
|
current_frame_num = self.start_frame_min |
|
|
cap_thread = cv2.VideoCapture(self.video_path) |
|
|
cap_thread.set(cv2.CAP_PROP_POS_FRAMES, float(self.start_frame_min)) |
|
|
|
|
|
while current_frame_num <= self.start_frame_max: |
|
|
ret, frame = cap_thread.read() |
|
|
if not ret: |
|
|
|
|
|
break |
|
|
|
|
|
processed_frame = cv2.cvtColor(cv2.resize(frame, (256, 144), interpolation=cv2.INTER_LINEAR), cv2.COLOR_BGR2GRAY) |
|
|
self.frames_list_start[current_frame_num] = processed_frame |
|
|
current_frame_num += 1 |
|
|
|
|
|
cap_thread.release() |
|
|
|
|
|
|
|
|
current_frame_num = self.end_frame_min |
|
|
cap_thread = cv2.VideoCapture(self.video_path) |
|
|
cap_thread.set(cv2.CAP_PROP_POS_FRAMES, float(self.end_frame_min)) |
|
|
|
|
|
while current_frame_num <= self.end_frame_max: |
|
|
ret, frame = cap_thread.read() |
|
|
if not ret: |
|
|
|
|
|
break |
|
|
|
|
|
processed_frame = cv2.cvtColor(cv2.resize(frame, (256, 144), interpolation=cv2.INTER_LINEAR), cv2.COLOR_BGR2GRAY) |
|
|
self.frames_list_end[current_frame_num] = processed_frame |
|
|
current_frame_num += 1 |
|
|
|
|
|
cap_thread.release() |
|
|
|
|
|
'''while True: |
|
|
ret, frame = self.cap.read() |
|
|
|
|
|
if i >= start_frame_min and i <= start_frame_max: |
|
|
|
|
|
self.frames_list_start[i] = cv2.cvtColor(cv2.resize(frame, (426,240), interpolation=cv2.INTER_LINEAR),cv2.COLOR_BGR2GRAY) |
|
|
|
|
|
if i >= end_frame_min and i <= end_frame_max: |
|
|
self.frames_list_end[i] = cv2.cvtColor(cv2.resize(frame, (426,240), interpolation=cv2.INTER_LINEAR),cv2.COLOR_BGR2GRAY) |
|
|
|
|
|
if not ret or i >= self.total_frames: |
|
|
break |
|
|
|
|
|
i += 1''' |
|
|
|
|
|
self.cap = cv2.VideoCapture(tfile.name) |
|
|
return True |
|
|
|
|
|
|
|
|
def load_video2(self, video_file, output_resolution=(854, 480)) -> bool: |
|
|
""" |
|
|
Load video file, resize to 480p, and get basic information. |
|
|
|
|
|
Args: |
|
|
video_file: Input video file object |
|
|
output_resolution: Tuple of (width, height) for resizing (default: 854x480 for 480p) |
|
|
|
|
|
Returns: |
|
|
bool: True if successful, False otherwise |
|
|
""" |
|
|
try: |
|
|
|
|
|
tfile = tempfile.NamedTemporaryFile(delete=False, suffix='.mp4') |
|
|
tfile.write(video_file.read()) |
|
|
tfile.close() |
|
|
|
|
|
|
|
|
self.video_path = tfile.name |
|
|
|
|
|
|
|
|
self.cap = cv2.VideoCapture(tfile.name) |
|
|
if not self.cap.isOpened(): |
|
|
print("Error: Could not open video file.") |
|
|
return False |
|
|
|
|
|
|
|
|
self.total_frames = int(self.cap.get(cv2.CAP_PROP_FRAME_COUNT)) |
|
|
self.fps = int(self.cap.get(cv2.CAP_PROP_FPS)) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
output_path = tempfile.NamedTemporaryFile(delete=False, suffix='.mp4').name |
|
|
fourcc = cv2.VideoWriter_fourcc(*'mp4v') |
|
|
out = cv2.VideoWriter(output_path, fourcc, self.fps, output_resolution) |
|
|
|
|
|
|
|
|
while self.cap.isOpened(): |
|
|
ret, frame = self.cap.read() |
|
|
if not ret: |
|
|
break |
|
|
|
|
|
resized_frame = cv2.resize(frame, output_resolution, interpolation=cv2.INTER_AREA) |
|
|
out.write(resized_frame) |
|
|
|
|
|
|
|
|
self.cap.release() |
|
|
out.release() |
|
|
|
|
|
|
|
|
self.video_path = output_path |
|
|
self.cap = cv2.VideoCapture(self.video_path) |
|
|
if not self.cap.isOpened(): |
|
|
print("Error: Could not open resized video.") |
|
|
return False |
|
|
|
|
|
print(f"Video resized to {output_resolution} and saved to {output_path}") |
|
|
return True |
|
|
|
|
|
except Exception as e: |
|
|
print(f"Error processing video: {str(e)}") |
|
|
return False |
|
|
|
|
|
def load_video1(self, video_file) -> bool: |
|
|
"""Load video file and get basic information""" |
|
|
with tempfile.TemporaryFile(suffix='.mp4') as tfile: |
|
|
tfile.write(video_file.read()) |
|
|
tfile.seek(0) |
|
|
self.video_path = tfile.name |
|
|
self.cap = cv2.VideoCapture(tfile.name) |
|
|
if not self.cap.isOpened(): |
|
|
return False |
|
|
self.total_frames = int(self.cap.get(cv2.CAP_PROP_FRAME_COUNT)) |
|
|
self.fps = int(self.cap.get(cv2.CAP_PROP_FPS)) |
|
|
return True |
|
|
|
|
|
@profiler.track_time |
|
|
def get_frame1(self, frame_number: int) -> np.ndarray: |
|
|
""" |
|
|
Obtiene un frame específico del video con optimizaciones de rendimiento |
|
|
|
|
|
Args: |
|
|
frame_number: Número del frame a obtener |
|
|
|
|
|
Returns: |
|
|
Frame como array NumPy (formato RGB) o None si no está disponible |
|
|
""" |
|
|
if self.cap is None: |
|
|
return None |
|
|
|
|
|
|
|
|
if not hasattr(self, 'frame_cache'): |
|
|
|
|
|
self.frame_cache = {} |
|
|
self.frame_cache_size = 100 |
|
|
self.last_position = -1 |
|
|
|
|
|
|
|
|
if frame_number in self.frame_cache: |
|
|
return self.frame_cache[frame_number] |
|
|
|
|
|
|
|
|
if hasattr(self, 'last_position') and frame_number == self.last_position + 1: |
|
|
|
|
|
ret, frame = self.cap.read() |
|
|
if ret: |
|
|
self.last_position = frame_number |
|
|
rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) |
|
|
|
|
|
|
|
|
|
|
|
self.frame_cache[frame_number] = rgb_frame |
|
|
|
|
|
|
|
|
if len(self.frame_cache) > self.frame_cache_size: |
|
|
|
|
|
oldest = min(self.frame_cache.keys()) |
|
|
del self.frame_cache[oldest] |
|
|
|
|
|
return rgb_frame |
|
|
|
|
|
|
|
|
|
|
|
for attempt in range(3): |
|
|
self.cap.set(cv2.CAP_PROP_POS_FRAMES, frame_number) |
|
|
ret, frame = self.cap.read() |
|
|
|
|
|
if ret: |
|
|
|
|
|
self.last_position = frame_number |
|
|
rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) |
|
|
|
|
|
|
|
|
self.frame_cache[frame_number] = rgb_frame |
|
|
|
|
|
|
|
|
if len(self.frame_cache) > self.frame_cache_size: |
|
|
|
|
|
oldest = min(self.frame_cache.keys()) |
|
|
del self.frame_cache[oldest] |
|
|
|
|
|
return rgb_frame |
|
|
|
|
|
if attempt < 2: |
|
|
|
|
|
|
|
|
if hasattr(self, 'video_path') and self.video_path: |
|
|
self.cap.release() |
|
|
self.cap = cv2.VideoCapture(self.video_path) |
|
|
|
|
|
|
|
|
return None |
|
|
|
|
|
def get_frame(self, frame_number: int) -> np.ndarray: |
|
|
|
|
|
if self.cap is None: |
|
|
return None |
|
|
|
|
|
'''if frame_number in self.frame_cache: |
|
|
return self.frame_cache[frame_number]''' |
|
|
|
|
|
if hasattr(self, 'last_position') and frame_number == self.last_position + 1: |
|
|
ret, frame = self.cap.read() |
|
|
if ret: |
|
|
self.last_position = frame_number |
|
|
rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) |
|
|
self.frame_cache[frame_number] = rgb_frame |
|
|
if len(self.frame_cache) > self.frame_cache_size: |
|
|
self.frame_cache.popitem(last=False) |
|
|
return cv2.resize(rgb_frame, (849, 477)) |
|
|
|
|
|
for attempt in range(3): |
|
|
|
|
|
self.cap.set(cv2.CAP_PROP_POS_FRAMES, frame_number) |
|
|
ret, frame = self.cap.read() |
|
|
if ret: |
|
|
self.last_position = frame_number |
|
|
rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) |
|
|
self.frame_cache[frame_number] = rgb_frame |
|
|
if len(self.frame_cache) > self.frame_cache_size: |
|
|
self.frame_cache.popitem(last=False) |
|
|
|
|
|
return cv2.resize(rgb_frame, (854,480), interpolation=cv2.INTER_LINEAR) |
|
|
|
|
|
if attempt < 2 and hasattr(self, 'video_path') and self.video_path: |
|
|
self.cap.release() |
|
|
self.cap = cv2.VideoCapture(self.video_path) |
|
|
|
|
|
print(f"Error reading frame {frame_number}, retrying...") |
|
|
return None |
|
|
|
|
|
def get_frame_example(self, frame_number: int) -> np.ndarray: |
|
|
""" |
|
|
Obtiene un frame específico del video con optimizaciones de rendimiento |
|
|
|
|
|
Args: |
|
|
frame_number: Número del frame a obtener |
|
|
|
|
|
Returns: |
|
|
Frame como array NumPy (formato RGB) o None si no está disponible |
|
|
""" |
|
|
if self.cap is None: |
|
|
return None |
|
|
print(f"Frame number: {frame_number}") |
|
|
|
|
|
|
|
|
if not hasattr(self, 'frame_cache'): |
|
|
|
|
|
self.frame_cache = {} |
|
|
self.frame_cache_size = 30 |
|
|
self.last_position = -1 |
|
|
|
|
|
|
|
|
if frame_number in self.frame_cache: |
|
|
return self.frame_cache[frame_number] |
|
|
|
|
|
|
|
|
for attempt in range(3): |
|
|
try: |
|
|
self.cap.set(cv2.CAP_PROP_POS_FRAMES, frame_number) |
|
|
ret, frame = self.cap.read() |
|
|
|
|
|
if ret: |
|
|
|
|
|
self.last_position = frame_number |
|
|
rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) |
|
|
|
|
|
|
|
|
self.frame_cache[frame_number] = rgb_frame |
|
|
|
|
|
|
|
|
if len(self.frame_cache) > self.frame_cache_size: |
|
|
|
|
|
oldest = min(self.frame_cache.keys()) |
|
|
del self.frame_cache[oldest] |
|
|
|
|
|
return rgb_frame |
|
|
except: |
|
|
pass |
|
|
|
|
|
if attempt < 2: |
|
|
|
|
|
|
|
|
if hasattr(self, 'video_path') and self.video_path: |
|
|
self.cap.release() |
|
|
self.cap = cv2.VideoCapture(self.video_path) |
|
|
|
|
|
|
|
|
|
|
|
return None |
|
|
|
|
|
@profiler.track_time |
|
|
def mask_helmet_yolo(self, color_image: np.ndarray, helmet_height_ratio: float = 0.3, prev_mask: np.ndarray = None) -> Tuple[np.ndarray, np.ndarray]: |
|
|
""" |
|
|
Usa YOLOv8 para segmentar el casco y lo pinta de verde. |
|
|
Si se proporciona una máscara previa, la reutiliza. |
|
|
Args: |
|
|
color_image: Imagen en color (BGR). |
|
|
helmet_height_ratio: Proporción de la imagen a considerar como región del casco (parte inferior). |
|
|
prev_mask: Máscara previa para reutilizar (opcional). |
|
|
Returns: |
|
|
Tuple: (Imagen con la región del casco pintada de verde, Máscara generada o reutilizada). |
|
|
""" |
|
|
|
|
|
result_1 = color_image.copy() |
|
|
height, width = color_image.shape[:2] |
|
|
|
|
|
|
|
|
if prev_mask is not None: |
|
|
mask_final = prev_mask |
|
|
else: |
|
|
|
|
|
image_rgb = cv2.cvtColor(color_image, cv2.COLOR_BGR2RGB) |
|
|
|
|
|
|
|
|
results = self.yolo_model(image_rgb, conf=0.2, iou=0.5,imgsz=224) |
|
|
|
|
|
|
|
|
mask_final = np.zeros((height, width), dtype=np.uint8) |
|
|
|
|
|
|
|
|
if results[0].masks is not None: |
|
|
for result in results: |
|
|
masks = result.masks.data.cpu().numpy() |
|
|
boxes = result.boxes.xyxy.cpu().numpy() |
|
|
classes = result.boxes.cls.cpu().numpy() |
|
|
|
|
|
|
|
|
|
|
|
for i, cls in enumerate(classes): |
|
|
|
|
|
if int(cls) == 0: |
|
|
|
|
|
'''mask = masks[i] |
|
|
# Redimensionar la máscara al tamaño de la imagen |
|
|
mask = cv2.resize(mask, (width, height), interpolation=cv2.INTER_NEAREST) |
|
|
mask = (mask > 0).astype(np.uint8) * 255 # Convertir a binario (0 o 255) |
|
|
|
|
|
# Opcional: Filtrar usando la ROI inferior para enfocarse en el casco |
|
|
roi_height = int(height * helmet_height_ratio) |
|
|
roi_mask = np.zeros((height, width), dtype=np.uint8) |
|
|
roi_mask[height - roi_height:, :] = 255 # Parte inferior |
|
|
mask = cv2.bitwise_and(mask, roi_mask) |
|
|
|
|
|
|
|
|
|
|
|
# Combinar máscaras si hay múltiples detecciones |
|
|
mask_final = cv2.bitwise_or(mask_final, mask)''' |
|
|
|
|
|
mask = masks[i] |
|
|
mask = cv2.resize(mask, (width, height), interpolation=cv2.INTER_NEAREST) |
|
|
mask = (mask > 0).astype(np.uint8) * 255 |
|
|
mask_final = cv2.bitwise_or(mask_final, mask) |
|
|
|
|
|
|
|
|
kernel = np.ones((5, 5), np.uint8) |
|
|
mask_final = cv2.erode(mask_final, kernel, iterations=1) |
|
|
mask_final = cv2.dilate(mask_final, kernel, iterations=3) |
|
|
|
|
|
else: |
|
|
|
|
|
print("No helmet detected in this frame.") |
|
|
return result_1, mask_final |
|
|
|
|
|
|
|
|
green_color = np.zeros_like(color_image) |
|
|
green_color[:, :] = [125, 125, 125] |
|
|
|
|
|
|
|
|
masked_green = cv2.bitwise_and(green_color, green_color, mask=mask_final) |
|
|
|
|
|
|
|
|
mask_inv = cv2.bitwise_not(mask_final) |
|
|
|
|
|
|
|
|
|
|
|
result_original = cv2.bitwise_and(result_1, result_1, mask=mask_inv) |
|
|
result = cv2.add(masked_green, result_original) |
|
|
|
|
|
return result, mask_final |
|
|
|
|
|
def mask_helmet(self, img): |
|
|
"""Mask the helmet region using SAM and paint it green.""" |
|
|
print("Processing frame...") |
|
|
|
|
|
img = cv2.resize(img, (224, 224), interpolation=cv2.INTER_LINEAR) |
|
|
height, width = img.shape[:2] |
|
|
img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB) |
|
|
|
|
|
outputs = self.model.run(None, {"images":preprocess_image_tensor(img)}) |
|
|
flag,result = postprocess_outputs(outputs, height, width) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if flag is True: |
|
|
result_image = img.copy() |
|
|
overlay = np.zeros_like(img, dtype=np.uint8) |
|
|
color = (125, 125, 125, 255) |
|
|
|
|
|
fill_color = color[:3] |
|
|
alpha = color[3] / 255.0 |
|
|
|
|
|
for obj in result: |
|
|
x1, y1, x2, y2, _, _, _, polygon = obj |
|
|
|
|
|
polygon = [(round(x1 + point[0]), round(y1 + point[1])) for point in polygon] |
|
|
|
|
|
pts = np.array(polygon, dtype=np.int32).reshape((-1, 1, 2)) |
|
|
|
|
|
cv2.fillPoly(overlay, [pts], fill_color) |
|
|
|
|
|
|
|
|
mask = np.any(overlay != 0, axis=2).astype(np.float32) |
|
|
alpha_mask = mask * alpha |
|
|
|
|
|
for c in range(3): |
|
|
result_image[:, :, c] = (1 - alpha_mask) * result_image[:, :, c] + alpha_mask * overlay[:, :, c] |
|
|
|
|
|
return result_image |
|
|
else: |
|
|
|
|
|
print("No helmet detected in this frame.") |
|
|
return img |
|
|
|
|
|
def extract_frames1(self, start_frame: int, end_frame: int, fps_target: int = 10) -> List[np.ndarray]: |
|
|
""" |
|
|
Extract frames con procesamiento vectorizado para mayor rendimiento, actualizando la máscara cada 10 frames. |
|
|
""" |
|
|
frames, crude_frames = [], [] |
|
|
|
|
|
|
|
|
total_frames_selection = end_frame - start_frame + 1 |
|
|
|
|
|
|
|
|
selection_duration = total_frames_selection / self.fps |
|
|
|
|
|
|
|
|
frames_to_extract = int(selection_duration * fps_target) |
|
|
frames_to_extract = max(1, frames_to_extract) |
|
|
|
|
|
|
|
|
if frames_to_extract < total_frames_selection: |
|
|
frame_indices = np.linspace(start_frame, end_frame, frames_to_extract, dtype=int) |
|
|
else: |
|
|
frame_indices = np.arange(start_frame, end_frame + 1) |
|
|
counter = 0 |
|
|
|
|
|
BATCH_SIZE =150 |
|
|
last_mask = None |
|
|
|
|
|
for i in range(0, len(frame_indices), BATCH_SIZE): |
|
|
batch_indices = frame_indices[i:i+BATCH_SIZE] |
|
|
batch_frames = [] |
|
|
|
|
|
|
|
|
|
|
|
for frame_num in batch_indices: |
|
|
frame = self.get_frame(frame_num) |
|
|
if frame is not None: |
|
|
batch_frames.append((frame_num, frame)) |
|
|
|
|
|
|
|
|
if batch_frames: |
|
|
for idx, (frame_num, frame) in enumerate(batch_frames): |
|
|
cropped = self.crop_frame(frame) |
|
|
|
|
|
result = self.mask_helmet(cropped) |
|
|
|
|
|
clahe_image = self.apply_clahe(result) |
|
|
|
|
|
threshold_image = self.apply_treshold(clahe_image) |
|
|
|
|
|
frames.append(threshold_image) |
|
|
|
|
|
return frames, crude_frames |
|
|
|
|
|
def extract_frames(self, start_frame: int, end_frame: int, fps_target: int = 10) -> List[np.ndarray]: |
|
|
frames, crude_frames = [], [] |
|
|
|
|
|
total_frames_selection = end_frame - start_frame + 1 |
|
|
selection_duration = total_frames_selection / self.fps |
|
|
frames_to_extract = max(1, int(selection_duration * fps_target)) |
|
|
frame_indices = np.linspace(start_frame, end_frame, frames_to_extract, dtype=int) if frames_to_extract < total_frames_selection else np.arange(start_frame, end_frame + 1) |
|
|
|
|
|
BATCH_SIZE = 64 |
|
|
|
|
|
def process_frame(frame_data): |
|
|
frame_num, frame = frame_data |
|
|
if frame is None: |
|
|
return None |
|
|
cropped = self.crop_frame(frame) |
|
|
result = self.mask_helmet(cropped) |
|
|
clahe_image = self.apply_clahe(result) |
|
|
threshold_image = self.apply_treshold(clahe_image) |
|
|
return threshold_image |
|
|
|
|
|
for i in range(0, len(frame_indices), BATCH_SIZE): |
|
|
batch_indices = frame_indices[i:i+BATCH_SIZE] |
|
|
batch_frames = [(idx, self.get_frame(idx)) for idx in batch_indices] |
|
|
with ThreadPoolExecutor(max_workers=2) as executor: |
|
|
batch_results = list(executor.map(process_frame, [f for f in batch_frames if f[1] is not None])) |
|
|
frames.extend([r for r in batch_results if r is not None]) |
|
|
|
|
|
return frames, crude_frames |
|
|
|
|
|
|
|
|
@profiler.track_time |
|
|
def crop_frame(self,image): |
|
|
|
|
|
|
|
|
if image is None: |
|
|
print(f"Error loading") |
|
|
return None |
|
|
|
|
|
height, width, _ = image.shape |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
y_start = int(height * self.y_start) |
|
|
crop_height = height - y_start |
|
|
square_size = crop_height |
|
|
|
|
|
|
|
|
new_width = square_size |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
x_center = int(width * self.x_center) |
|
|
x_start = max(0, x_center - new_width // 2) |
|
|
x_end = x_start + new_width |
|
|
|
|
|
|
|
|
if x_end > width: |
|
|
x_end = width |
|
|
x_start = max(0, width - new_width) |
|
|
|
|
|
|
|
|
cropped_image = image[y_start:y_start+crop_height, x_start:x_end] |
|
|
|
|
|
|
|
|
return cropped_image |
|
|
|
|
|
def crop_frame_example(self,image): |
|
|
|
|
|
if image is None: |
|
|
print(f"Error loading") |
|
|
return None |
|
|
|
|
|
height, width, _ = image.shape |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
y_start = int(height * self.y_start) |
|
|
crop_height = height - y_start |
|
|
square_size = crop_height |
|
|
|
|
|
|
|
|
new_width = square_size |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
x_center = int(width * self.x_center) |
|
|
x_start = max(0, x_center - new_width // 2) |
|
|
x_end = x_start + new_width |
|
|
|
|
|
|
|
|
if x_end > width: |
|
|
x_end = width |
|
|
x_start = max(0, width - new_width) |
|
|
|
|
|
|
|
|
cropped_image = image[y_start:y_start+crop_height, x_start:x_end] |
|
|
cropped_image = recortar_imagen(cropped_image,self.starty, self.axes) |
|
|
cropped_image = recortar_imagen_again(cropped_image,self.starty, self.axes) |
|
|
|
|
|
return cropped_image |
|
|
|
|
|
@profiler.track_time |
|
|
def apply_clahe(self, image): |
|
|
|
|
|
image = recortar_imagen(image,self.starty, self.axes) |
|
|
if self.mode == "Default": |
|
|
clahe_image = cv2.createCLAHE(clipLimit=5.0, tileGridSize=(3, 3)).apply(cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)) |
|
|
|
|
|
elif self.mode == "Low ilumination": |
|
|
clahe_image = cv2.createCLAHE(clipLimit=7.0, tileGridSize=(3, 3)).apply(cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)) |
|
|
|
|
|
return clahe_image |
|
|
|
|
|
@profiler.track_time |
|
|
def apply_treshold(self, image): |
|
|
|
|
|
|
|
|
|
|
|
'''_, edges, _, config = adaptive_edge_detection( |
|
|
image, |
|
|
min_edge_percentage=3, |
|
|
max_edge_percentage=6, |
|
|
target_percentage=5, |
|
|
max_attempts=5 |
|
|
)''' |
|
|
percentage = calculate_black_pixels_percentage(image) |
|
|
_, edges, _, config = adaptive_edge_detection( |
|
|
image, |
|
|
min_edge_percentage=percentage, |
|
|
max_edge_percentage=percentage, |
|
|
target_percentage=percentage, |
|
|
max_attempts=1, |
|
|
mode = self.mode |
|
|
) |
|
|
|
|
|
|
|
|
if edges is not None: |
|
|
edges = recortar_imagen_again(edges,self.starty, self.axes) |
|
|
return edges |
|
|
|
|
|
|
|
|
def __del__(self): |
|
|
if self.cap is not None: |
|
|
self.cap.release() |
|
|
self.clear_cache() |
|
|
|
|
|
|