import cv2 import numpy as np from typing import Tuple import tempfile import os from PIL import Image import sys from pymongo import MongoClient from dotenv import load_dotenv import os import streamlit as st try: if getattr(sys, 'frozen', False): # En el ejecutable, intentar sys._MEIPASS BASE_DIR = getattr(sys, '_MEIPASS', os.path.dirname(sys.executable)) print(f"Executable mode - Initial BASE_DIR: {BASE_DIR} (_MEIPASS: {hasattr(sys, '_MEIPASS')})") # Verificar si BASE_DIR contiene los archivos esperados expected_dirs = ['navigation', 'models', 'assets', 'img', 'utils'] if not any(os.path.exists(os.path.join(BASE_DIR, d)) for d in expected_dirs): print(f"Warning: Expected directories not found in {BASE_DIR}") # Buscar _MEI en el directorio padre temp_dir = os.path.dirname(BASE_DIR) if BASE_DIR != os.path.dirname(sys.executable) else BASE_DIR for d in os.listdir(temp_dir): if d.startswith('_MEI'): candidate = os.path.join(temp_dir, d) if any(os.path.exists(os.path.join(candidate, ed)) for ed in expected_dirs): BASE_DIR = candidate print(f"Adjusted BASE_DIR to _MEI directory: {BASE_DIR}") break else: print(f"No _MEI directory found in {temp_dir}, using {BASE_DIR}") else: # En desarrollo, usar el directorio del proyecto current_file = os.path.abspath(os.path.realpath(__file__)) print(f"Development mode - Current file: {current_file}") BASE_DIR = os.path.dirname(os.path.dirname(current_file)) # Subir de utils/ a F1-machine-learning-webapp/ print(f"Development mode - BASE_DIR: {BASE_DIR}") except Exception as e: print(f"Error setting BASE_DIR: {e}") # Fallback BASE_DIR = os.path.dirname(os.path.abspath(os.path.realpath(__file__))) BASE_DIR = os.path.dirname(BASE_DIR) print(f"Fallback BASE_DIR: {BASE_DIR}") BASE_DIR = os.path.normpath(BASE_DIR) print(f"Final BASE_DIR: {BASE_DIR}") #load_dotenv() # Carga las variables desde .env #mongo_uri = os.getenv("MONGO_URI") @st.cache_resource def get_mongo_client(): return MongoClient(os.getenv('MONGO_URI')) client = get_mongo_client() def get_metrics_collections(): db = client["f1_data"] metrics_collection = db["usage_metrics"] metrics_page = db["visits"] return metrics_collection, metrics_page, db metrics_collection, metrics_page, db = get_metrics_collections() '''if not metrics_page.find_one({"page": "inicio"}): metrics_page.insert_one({"page": "inicio", "visits": 0}) if not metrics_collection.find_one({"action": "descargar_app"}): metrics_collection.insert_one({"action": "descargar_app", "count": 0})''' '''except: print("Error loading MongoDB URI from .env file. Please check your configuration.") client = None metrics_collection = None metrics_page = None db = None''' #-------------YOLO ONNX HELPERS------------------- def preprocess_image_tensor(image_rgb: np.ndarray) -> np.ndarray: """Preprocess image to match Ultralytics YOLOv8.""" '''input = np.array(image_rgb) input = input.transpose(2, 0, 1) input = input.reshape(1,3,224,224).astype("float32") input = input/255.0''' input_data = image_rgb.transpose(2, 0, 1).reshape(1, 3, 224, 224) # Convert to float32 and normalize to [0, 1] input_data = input_data.astype(np.float32) / 255.0 return input_data def postprocess_outputs(outputs: list, height: int, width: int) -> Tuple[np.ndarray, np.ndarray, np.ndarray]: """Process ONNX model outputs for a single-class model.""" res_size = 56 output0 = outputs[0] output1 = outputs[1] output0 = output0[0].transpose() output1 = output1[0] boxes = output0[:,0:5] masks = output0[:,5:] output1 = output1.reshape(32,res_size*res_size) masks = masks @ output1 boxes = np.hstack([boxes,masks]) yolo_classes = [ "helmet" ] # parse and filter all boxes objects = [] for row in boxes: xc,yc,w,h = row[:4] x1 = (xc-w/2)/224*width y1 = (yc-h/2)/224*height x2 = (xc+w/2)/224*width y2 = (yc+h/2)/224*height prob = row[4:5].max() if prob < 0.2: continue class_id = row[4:5].argmax() label = yolo_classes[class_id] mask = get_mask(row[5:25684], (x1,y1,x2,y2), width, height) try: polygon = get_polygon(mask) except: continue objects.append([x1,y1,x2,y2,label,prob,mask,polygon]) # apply non-maximum suppression objects.sort(key=lambda x: x[5], reverse=True) result = [] while len(objects)>0: result.append(objects[0]) objects = [object for object in objects if iou(object,objects[0])<0.7] return True,result def intersection(box1,box2): box1_x1,box1_y1,box1_x2,box1_y2 = box1[:4] box2_x1,box2_y1,box2_x2,box2_y2 = box2[:4] x1 = max(box1_x1,box2_x1) y1 = max(box1_y1,box2_y1) x2 = min(box1_x2,box2_x2) y2 = min(box1_y2,box2_y2) return (x2-x1)*(y2-y1) def union(box1,box2): box1_x1,box1_y1,box1_x2,box1_y2 = box1[:4] box2_x1,box2_y1,box2_x2,box2_y2 = box2[:4] box1_area = (box1_x2-box1_x1)*(box1_y2-box1_y1) box2_area = (box2_x2-box2_x1)*(box2_y2-box2_y1) return box1_area + box2_area - intersection(box1,box2) def iou(box1,box2): return intersection(box1,box2)/union(box1,box2) def sigmoid(z): return 1/(1 + np.exp(-z)) # parse segmentation mask def get_mask(row, box, img_width, img_height): # convert mask to image (matrix of pixels) res_size = 56 mask = row.reshape(res_size,res_size) mask = sigmoid(mask) mask = (mask > 0.2).astype("uint8")*255 # crop the object defined by "box" from mask x1,y1,x2,y2 = box mask_x1 = round(x1/img_width*res_size) mask_y1 = round(y1/img_height*res_size) mask_x2 = round(x2/img_width*res_size) mask_y2 = round(y2/img_height*res_size) mask = mask[mask_y1:mask_y2,mask_x1:mask_x2] # resize the cropped mask to the size of object img_mask = Image.fromarray(mask,"L") img_mask = img_mask.resize((round(x2-x1),round(y2-y1))) mask = np.array(img_mask) return mask # calculate bounding polygon from mask def get_polygon(mask): contours = cv2.findContours(mask, cv2.RETR_LIST, cv2.CHAIN_APPROX_SIMPLE) polygon = [[contour[0][0],contour[0][1]] for contour in contours[0][0]] return polygon #------------------VIDEO CONVERSION------------------ def convert_video_to_10fps(video_file): """ Convert an uploaded video file to 10 FPS and return metadata Args: video_file: Streamlit uploaded file object Returns: Dictionary with video metadata and path to converted file """ try: # Create temporary file for the original upload orig_tfile = tempfile.NamedTemporaryFile(delete=False, suffix='.mp4') orig_tfile.write(video_file.read()) orig_tfile.close() # Open the original video to get properties orig_cap = cv2.VideoCapture(orig_tfile.name) if not orig_cap.isOpened(): return {"success": False, "error": "Could not open video file"} orig_fps = orig_cap.get(cv2.CAP_PROP_FPS) width = int(orig_cap.get(cv2.CAP_PROP_FRAME_WIDTH)) height = int(orig_cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) orig_total_frames = int(orig_cap.get(cv2.CAP_PROP_FRAME_COUNT)) # Calculate duration duration_seconds = orig_total_frames / orig_fps expected_frames = int(duration_seconds * 10) # 10 fps # Create output temp file converted_path = tempfile.mktemp(suffix='.mp4') # Create VideoWriter fourcc = cv2.VideoWriter_fourcc(*'mp4v') out = cv2.VideoWriter(converted_path, fourcc, 10, (width, height)) # Calculate frame sampling if orig_fps <= 10: # If original is slower than target, duplicate frames step = 1 duplication = int(10 / orig_fps) else: # If original is faster, skip frames step = orig_fps / 10 duplication = 1 # Convert the video frame_count = 0 output_count = 0 while orig_cap.isOpened(): ret, frame = orig_cap.read() if not ret: break # Determine if we should include this frame if frame_count % step < 1: # Using modulo < 1 for floating point step values # Write frame (possibly multiple times) for _ in range(duplication): out.write(frame) output_count += 1 frame_count += 1 # Release resources orig_cap.release() out.release() os.unlink(orig_tfile.name) # Delete original temp file # Instead of returning a dictionary, read the file back into memory with open(converted_path, "rb") as f: video_data = f.read() # Clean up the temporary file os.unlink(converted_path) # Return a file-like object from io import BytesIO video_io = BytesIO(video_data) video_io.name = "converted_10fps.mp4" return video_io except Exception as e: print(f"Error converting video: {e}") return None def recortar_imagen(image,starty_dic, axes_dic): height, width, _ = image.shape mask = np.zeros((height, width), dtype=np.uint8) start_y = int((starty_dic-.02) * height) cv2.rectangle(mask, (0, start_y), (width, height), 255, -1) center = (width // 2, start_y) axes = (width // 2, int(axes_dic * height)) cv2.ellipse(mask, center, axes, 0, 180, 360, 255, -1) result = cv2.bitwise_and(image, image, mask=mask) return result def recortar_imagen_again(image,starty_dic, axes_dic): try: height, width,_ = image.shape except : height, width = image.shape mask = np.zeros((height, width), dtype=np.uint8) start_y = int(starty_dic * height) cv2.rectangle(mask, (0, start_y), (width, height), 255, -1) center = (width // 2, start_y) axes = (width // 2, int(axes_dic * height)) cv2.ellipse(mask, center, axes, 0, 180, 360, 255, -1) result = cv2.bitwise_and(image, image, mask=mask) return result def calculate_black_pixels_percentage(image): """ Calcula el porcentaje de píxeles totalmente negros en la imagen. Args: image: Imagen cargada con cv2 (BGR o escala de grises). is_grayscale: True si la imagen ya está en escala de gruises, False si es a color. Returns: float: Porcentaje de píxeles negros. """ # Obtener dimensiones '''image = cv2.imread(image_path) image = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)''' if image is None: print(f"Error loading image") return 0 if len(image.shape) == 3: image = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY) else: image = image.copy() h, w = image.shape[:2] total_pixels = h * w black_pixels = np.sum(image < 10) # Calcular porcentaje percentage = (black_pixels / total_pixels) * 100 percentage = (100.00 - float(percentage)) * .06 return percentage def create_rectangular_roi(height, width, x1=0, y1=0, x2=None, y2=None): if x2 is None: x2 = width if y2 is None: y2 = height mask = np.zeros((height, width), dtype=np.uint8) cv2.rectangle(mask, (x1, y1), (x2, y2), 255, -1) return mask def preprocess_image(image, mask=None): if len(image.shape) == 3: gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY) else: gray = image.copy() denoised = cv2.bilateralFilter(gray, d=3, sigmaColor=20, sigmaSpace=10) sharpened = cv2.addWeighted(denoised, 3.0, denoised, -2.0, 0) normalized = cv2.normalize(sharpened, None, 0, 255, cv2.NORM_MINMAX) if mask is not None: return cv2.bitwise_and(normalized, normalized, mask=mask) return normalized def calculate_robust_rms_contrast(image, mask=None, bright_threshold=240): if len(image.shape) == 3: image = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY) if mask is not None: masked_image = image[mask > 0] else: masked_image = image.ravel() if len(masked_image) == 0: mean = np.mean(image) std_dev = np.sqrt(np.mean((image - mean) ** 2)) else: mask_bright = masked_image < bright_threshold masked_image = masked_image[mask_bright] if len(masked_image) == 0: mean = np.mean(image) std_dev = np.sqrt(np.mean((image - mean) ** 2)) else: mean = np.mean(masked_image) std_dev = np.sqrt(np.mean((masked_image - mean) ** 2)) return std_dev / 255.0 def adaptive_clahe_iterative(image, roi_mask, initial_clip_limit=1.0, max_clip_limit=10.0, iterations=20, target_rms_min=0.199, target_rms_max=0.5, bright_threshold=230): if len(image.shape) == 3: original_gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY) else: original_gray = image.copy() #preprocessed_image = preprocess_image(original_gray) best_image = original_gray.copy() best_rms = calculate_robust_rms_contrast(original_gray, roi_mask, bright_threshold) clip_limit = initial_clip_limit for i in range(iterations): clahe = cv2.createCLAHE(clipLimit=clip_limit, tileGridSize=(8, 8)) current_image = clahe.apply(original_gray) rms_contrast = calculate_robust_rms_contrast(current_image, roi_mask, bright_threshold) if target_rms_min <= rms_contrast <= target_rms_max: return current_image if rms_contrast > best_rms: best_rms = rms_contrast best_image = current_image.copy() if rms_contrast > target_rms_max: clip_limit = min(clip_limit, 1.0) else: clip_limit = min(initial_clip_limit + (i * 0.5), max_clip_limit) return best_image def adaptive_edge_detection(imagen, min_edge_percentage=5.5, max_edge_percentage=6.5, target_percentage=6.0, max_attempts=5,mode="Default"): """ Detecta bordes con ajuste progresivo de parámetros hasta lograr un porcentaje óptimo de píxeles de borde en la imagen - optimizado con operaciones vectorizadas. """ # Read image original = imagen if original is None: print(f"Error loading image") return None, None, None, None # Convert to grayscale gray = original # Calculate total pixels for percentage calculation total_pixels = gray.shape[0] * gray.shape[1] min_edge_pixels = int((min_edge_percentage / 100) * total_pixels) max_edge_pixels = int((max_edge_percentage / 100) * total_pixels) target_edge_pixels = int((target_percentage / 100) * total_pixels) # Initial parameters - ajustados para conseguir un rango alrededor del 6% de bordes clip_limits = [1] grid_sizes = [(2, 2)] # Empezamos con umbrales más altos para restringir la cantidad de bordes canny_thresholds = [(55, 170), (45, 160), (35, 150), (25, 140), (20, 130),(20, 130),(20, 130)] best_edges = None best_enhanced = None best_config = None best_edge_score = float('inf') # Inicializamos con un valor alto edge_percentage = 0 # Try progressively more aggressive parameters for attempt in range(max_attempts): # Get parameters for this attempt clip_limit = clip_limits[attempt] grid_size = grid_sizes[attempt] low_threshold, high_threshold = canny_thresholds[attempt] if edge_percentage <= max_edge_percentage: clahe = cv2.createCLAHE(clipLimit=clip_limit, tileGridSize=grid_size) elif edge_count > max_edge_percentage: # Si hay demasiados bordes, aplicamos un CLAHE más fuerte clahe = cv2.createCLAHE(clipLimit=1, tileGridSize=grid_size) enhanced = clahe.apply(gray) #print("denoised shape:", denoised.shape, "dtype:", denoised.dtype) # Apply noise reduction for higher attempts '''if attempt >= 2: enhanced = cv2.bilateralFilter(enhanced, 5, 100, 100)''' if mode == "Default": denoised = cv2.bilateralFilter(enhanced, d=5, sigmaColor=200, sigmaSpace=200) median_intensity = np.median(denoised) low_threshold = max(20, (1.0 - .3) * median_intensity) high_threshold = max(80, (1.0 + .8) * median_intensity) elif mode == "Low ilumination": denoised = cv2.bilateralFilter(enhanced, d=5, sigmaColor=200, sigmaSpace=200) median_intensity = np.median(denoised) low_threshold = max(20, (1.0 - .3) * median_intensity) high_threshold = max(80, (1.0 + .8) * median_intensity) # Edge detection edges = cv2.Canny(denoised, low_threshold, high_threshold) std_intensity = np.std(edges) # Reducir ruido con operaciones morfológicas - vectorizado kernel = np.ones((1, 1), np.uint8) edges = cv2.morphologyEx( edges, cv2.MORPH_OPEN, kernel, iterations=0 if std_intensity < 60 else 1 # Más iteraciones si hay más ruido ) # Count edge pixels - vectorizado usando np.count_nonzero edge_count = np.count_nonzero(edges) edge_percentage = (edge_count / total_pixels) * 100 # Calcular distancia al objetivo - vectorizado edge_score = abs(edge_count - target_edge_pixels) # Record the best attempt (closest to target percentage) if edge_score < best_edge_score: best_edge_score = edge_score best_edges = edges.copy() # Hacer copia para evitar sobrescrituras best_enhanced = enhanced.copy() best_config = { 'attempt': attempt + 1, 'clip_limit': clip_limit, 'grid_size': grid_size, 'canny_thresholds': (low_threshold, high_threshold), 'edge_pixels': edge_count, 'edge_percentage': edge_percentage } # Salida temprana si estamos cerca del objetivo if abs(edge_percentage - target_percentage) < 0.1: # Within 0.2% of target break print(f"Mejor intento: {best_config['attempt']}, porcentaje de bordes: {edge_percentage:.2f}%") return best_enhanced, best_edges, original, best_config