import cv2 import os from pathlib import Path import numpy as np import torch import segmentation_models_pytorch as smp import pandas as pd from tqdm import tqdm def select_and_crop_video(input_video_path, output_video_path, display_scale=0.5): """ Permite seleccionar ROI de un video y guarda versión recortada Args: input_video_path: ruta al video original output_video_path: ruta donde guardar video recortado display_scale: escala para mostrar frame (0.5 = 50% del tamaño) Returns: (x, y, w, h): coordenadas del ROI seleccionado """ print(f"Abriendo video: {input_video_path}") # Abrir video cap = cv2.VideoCapture(input_video_path) if not cap.isOpened(): print(f"Error: No se pudo abrir el video") return None # Obtener propiedades del video fps = cap.get(cv2.CAP_PROP_FPS) total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) print(f"Video info:") print(f" Resolución: {width}x{height}") print(f" FPS: {fps}") print(f" Total frames: {total_frames}") print(f" Duración: {total_frames / fps:.2f} segundos") # Leer primer frame ret, first_frame = cap.read() if not ret: print("Error: No se pudo leer el primer frame") cap.release() return None # Redimensionar para visualización si es necesario display_height, display_width = first_frame.shape[:2] if display_scale != 1.0: display_width = int(width * display_scale) display_height = int(height * display_scale) frame_display = cv2.resize(first_frame, (display_width, display_height)) else: frame_display = first_frame.copy() print(f"\nSelecciona el ROI del tubo con el mouse") print("Presiona ENTER cuando termines, ESC para cancelar") # Seleccionar ROI roi = cv2.selectROI("Seleccionar ROI - Presiona ENTER cuando termines", frame_display, fromCenter=False) cv2.destroyAllWindows() if roi[2] == 0 or roi[3] == 0: print("Selección cancelada") cap.release() return None # Escalar coordenadas al tamaño original si se redimensionó if display_scale != 1.0: x = int(roi[0] / display_scale) y = int(roi[1] / display_scale) w = int(roi[2] / display_scale) h = int(roi[3] / display_scale) else: x, y, w, h = roi print(f"\nROI seleccionado (tamaño original):") print(f" x={x}, y={y}, width={w}, height={h}") # Mostrar preview del ROI preview = first_frame[y:y + h, x:x + w] if preview.shape[0] > 800: preview_scale = 800 / preview.shape[0] preview_w = int(preview.shape[1] * preview_scale) preview_h = int(preview.shape[0] * preview_scale) preview_display = cv2.resize(preview, (preview_w, preview_h)) else: preview_display = preview cv2.imshow("Preview ROI - Presiona cualquier tecla para continuar", preview_display) cv2.waitKey(0) cv2.destroyAllWindows() # Confirmar respuesta = input(f"\n¿Proceder a recortar el video completo? (s/n): ") if respuesta.lower() != 's': print("Operación cancelada") cap.release() return None # Crear directorio de salida si no existe Path(os.path.dirname(output_video_path)).mkdir(parents=True, exist_ok=True) # Crear video writer para video recortado fourcc = cv2.VideoWriter_fourcc(*'mp4v') out = cv2.VideoWriter(output_video_path, fourcc, fps, (w, h)) # Reiniciar captura al inicio cap.set(cv2.CAP_PROP_POS_FRAMES, 0) print(f"\nRecortando video...") print(f"De: {width}x{height} → A: {w}x{h}") frame_count = 0 while True: ret, frame = cap.read() if not ret: break # Recortar frame cropped = frame[y:y + h, x:x + w] # Escribir al video de salida out.write(cropped) frame_count += 1 if frame_count % 100 == 0: print(f" Procesados {frame_count}/{total_frames} frames ({frame_count / total_frames * 100:.1f}%)") # Liberar recursos cap.release() out.release() # Mostrar info del archivo resultante original_size = os.path.getsize(input_video_path) / (1024 ** 3) # GB cropped_size = os.path.getsize(output_video_path) / (1024 ** 3) # GB print(f"\n✓ Video recortado guardado: {output_video_path}") print(f" Frames procesados: {frame_count}") print(f" Tamaño original: {original_size:.2f} GB") print(f" Tamaño recortado: {cropped_size:.2f} GB") print(f" Reducción: {(1 - cropped_size / original_size) * 100:.1f}%") return (x, y, w, h) # ==================== FUNCIONES DE VIDEO Y FRAMES ===================================================================== def extract_frames_from_video_xx(video_path, output_dir, time_interval_seconds): """ Extrae TODOS los frames de un video y los guarda Args: video_path: ruta al video output_dir: carpeta donde guardar frames time_interval_seconds: intervalo real entre capturas (para metadata) Returns: int: número de frames extraídos """ Path(output_dir).mkdir(parents=True, exist_ok=True) cap = cv2.VideoCapture(video_path) fps = cap.get(cv2.CAP_PROP_FPS) total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) print(f"\nExtrayendo frames de: {os.path.basename(video_path)}") print(f" FPS: {fps}") print(f" Total frames: {total_frames}") print(f" Tiempo real total: {total_frames * time_interval_seconds / 60:.1f} minutos") frame_count = 0 while True: ret, frame = cap.read() if not ret: break real_time_seconds = frame_count * time_interval_seconds filename = f"frame_{frame_count:04d}_t{real_time_seconds}s.jpg" output_path = os.path.join(output_dir, filename) cv2.imwrite(output_path, frame) frame_count += 1 cap.release() print(f"✓ {frame_count} frames extraídos en: {output_dir}") return frame_count def extract_frames_from_video(video_path, output_dir, time_interval_seconds): """ Extrae TODOS los frames de un video y los guarda Args: video_path: ruta al video output_dir: carpeta donde guardar frames time_interval_seconds: intervalo real entre capturas (para metadata) Returns: tuple: (frames_list, frame_count) - lista de rutas de frames y número total """ Path(output_dir).mkdir(parents=True, exist_ok=True) cap = cv2.VideoCapture(video_path) fps = cap.get(cv2.CAP_PROP_FPS) total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) print(f"\nExtrayendo frames de: {os.path.basename(video_path)}") print(f" FPS: {fps}") print(f" Total frames: {total_frames}") print(f" Tiempo real total: {total_frames * time_interval_seconds / 60:.1f} minutos") frame_count = 0 frames_list = [] # ← AGREGAR ESTA LISTA while True: ret, frame = cap.read() if not ret: break real_time_seconds = frame_count * time_interval_seconds filename = f"frame_{frame_count:04d}_t{real_time_seconds}s.jpg" output_path = os.path.join(output_dir, filename) cv2.imwrite(output_path, frame) frames_list.append(output_path) # ← AGREGAR A LA LISTA frame_count += 1 cap.release() print(f"✓ {frame_count} frames extraídos en: {output_dir}") return frames_list, frame_count # ← RETORNAR AMBOS # ==================== FUNCIONES DE MODELO ============================================================================= def load_unet_model(model_path, device): """Cargar modelo U-Net entrenado""" model = smp.Unet( encoder_name="resnet34", encoder_weights="imagenet", in_channels=3, classes=1, activation=None ) checkpoint = torch.load(model_path, map_location=device) model.load_state_dict(checkpoint['model_state_dict']) model = model.to(device) model.eval() print(f"✓ Modelo cargado: {os.path.basename(model_path)}") print(f" Época: {checkpoint.get('epoch', 'N/A')}") print(f" Val Loss: {checkpoint.get('val_loss', 'N/A'):.4f}") return model def predict_interface_mask(model, image, device): """Predecir máscara de interfaz para una imagen""" img = cv2.cvtColor(image, cv2.COLOR_BGR2RGB) img = img.astype(np.float32) / 255.0 img_tensor = torch.from_numpy(img).permute(2, 0, 1).unsqueeze(0) img_tensor = img_tensor.to(device) with torch.no_grad(): output = model(img_tensor) output = torch.sigmoid(output) mask = output.squeeze().cpu().numpy() mask_binary = (mask > 0.5).astype(np.uint8) * 255 return mask_binary def extract_interface_points(mask_binary): """Extraer puntos (x,y) de la línea de interfaz desde máscara binaria""" height, width = mask_binary.shape interface_points = [] for x in range(width): column = mask_binary[:, x] white_pixels = np.where(column == 255)[0] if len(white_pixels) > 0: y_interface = white_pixels[0] interface_points.append((x, y_interface)) return interface_points def draw_interface_overlay(image, interface_points, show_text=True): """ Dibujar overlay de interfaz en imagen - Línea verde: interfaz completa - Línea azul: promedio horizontal Returns: (img_result, avg_height): imagen con overlay y altura promedio """ img_result = image.copy() if len(interface_points) == 0: return img_result, None # Línea verde (interfaz completa) pts = np.array(interface_points, dtype=np.int32) cv2.polylines(img_result, [pts], isClosed=False, color=(0, 255, 0), thickness=2) # Calcular altura promedio avg_y = int(np.mean([p[1] for p in interface_points])) height_from_bottom = image.shape[0] - avg_y # Línea azul horizontal (promedio) cv2.line(img_result, (0, avg_y), (image.shape[1], avg_y), color=(255, 0, 0), thickness=2) # Texto opcional if show_text: text = f"Altura: {height_from_bottom:.1f} px" cv2.putText(img_result, text, (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 255, 255), 2) return img_result, height_from_bottom # ==================== PROCESAMIENTO COMPLETO ==================== def process_video_complete(model, frames_dir, output_frames_dir, output_video_path, output_csv_path, time_interval, fps=30, device='cuda'): """ Procesamiento completo: frames → detección → video + CSV Args: model: modelo U-Net cargado frames_dir: carpeta con frames originales output_frames_dir: carpeta para guardar frames procesados output_video_path: ruta del video final output_csv_path: ruta del CSV de resultados time_interval: segundos entre frames originales fps: fps del video de salida device: 'cuda' o 'cpu' Returns: DataFrame con resultados """ Path(output_frames_dir).mkdir(parents=True, exist_ok=True) # Obtener frames all_frames = sorted([f for f in os.listdir(frames_dir) if f.endswith('.jpg')]) print(f"\n{'=' * 60}") print(f"PROCESANDO {len(all_frames)} FRAMES") print(f"{'=' * 60}\n") results = [] # Procesar cada frame for i, frame_name in enumerate(tqdm(all_frames, desc="Procesando frames")): frame_path = os.path.join(frames_dir, frame_name) image = cv2.imread(frame_path) # Predecir interfaz mask_binary = predict_interface_mask(model, image, device) interface_points = extract_interface_points(mask_binary) # Dibujar overlay img_result, height_px = draw_interface_overlay(image, interface_points) # Guardar frame procesado output_path = os.path.join(output_frames_dir, frame_name) cv2.imwrite(output_path, img_result) # Guardar resultados time_seconds = i * time_interval results.append({ 'frame_number': i, 'frame_name': frame_name, 'time_seconds': time_seconds, 'time_minutes': time_seconds / 60, 'height_pixels': height_px, }) # Crear DataFrame df = pd.DataFrame(results) # Guardar CSV df.to_csv(output_csv_path, index=False) print(f"\n✓ CSV guardado: {output_csv_path}") # Crear video print(f"\nGenerando video...") frames = sorted([f for f in os.listdir(output_frames_dir) if f.endswith('.jpg')]) first_frame = cv2.imread(os.path.join(output_frames_dir, frames[0])) height, width, _ = first_frame.shape fourcc = cv2.VideoWriter_fourcc(*'mp4v') video = cv2.VideoWriter(output_video_path, fourcc, fps, (width, height)) for frame_name in tqdm(frames, desc="Creando video"): frame = cv2.imread(os.path.join(output_frames_dir, frame_name)) video.write(frame) video.release() video_size = os.path.getsize(output_video_path) / (1024 ** 2) print(f"✓ Video guardado: {output_video_path} ({video_size:.1f} MB)") return df def plot_sedimentation_curve_xx(csv_path, output_plot_path=None): """Generar gráfica de curva de sedimentación desde CSV""" import matplotlib.pyplot as plt df = pd.read_csv(csv_path) fig, ax = plt.subplots(figsize=(12, 6)) ax.plot(df['time_minutes'], df['height_pixels'], 'b-', linewidth=2, label='Altura interfaz') ax.scatter(df['time_minutes'], df['height_pixels'], c='red', s=20, alpha=0.5) ax.set_xlabel('Tiempo (minutos)', fontsize=12, fontweight='bold') ax.set_ylabel('Altura interfaz (píxeles)', fontsize=12, fontweight='bold') ax.set_title('Curva de Sedimentación', fontsize=14, fontweight='bold') ax.grid(True, alpha=0.3) ax.legend() plt.tight_layout() if output_plot_path: plt.savefig(output_plot_path, dpi=150) print(f"✓ Gráfica guardada: {output_plot_path}") plt.show() return fig def plot_sedimentation_curve(data, output_plot_path=None): """ Generar gráfica de curva de sedimentación Args: data: DataFrame de pandas o ruta a archivo CSV output_plot_path: opcional, ruta para guardar la imagen """ import matplotlib.pyplot as plt # Si es string, cargar CSV; si es DataFrame, usar directamente if isinstance(data, str): df = pd.read_csv(data) else: df = data fig, ax = plt.subplots(figsize=(12, 6)) ax.plot(df['time_minutes'], df['height_pixels'], 'b-', linewidth=2, label='Altura interfaz') ax.scatter(df['time_minutes'], df['height_pixels'], c='red', s=20, alpha=0.5) ax.set_xlabel('Tiempo (minutos)', fontsize=12, fontweight='bold') ax.set_ylabel('Altura interfaz (píxeles)', fontsize=12, fontweight='bold') ax.set_title('Curva de Sedimentación', fontsize=14, fontweight='bold') ax.grid(True, alpha=0.3) ax.legend() plt.tight_layout() if output_plot_path: plt.savefig(output_plot_path, dpi=150) print(f"✓ Gráfica guardada: {output_plot_path}") return fig