settling-gradio / utils.py
dzjuca's picture
Upload 4 files
7ee7b32 verified
import cv2
import os
from pathlib import Path
import numpy as np
import torch
import segmentation_models_pytorch as smp
import pandas as pd
from tqdm import tqdm
def select_and_crop_video(input_video_path, output_video_path, display_scale=0.5):
"""
Permite seleccionar ROI de un video y guarda versión recortada
Args:
input_video_path: ruta al video original
output_video_path: ruta donde guardar video recortado
display_scale: escala para mostrar frame (0.5 = 50% del tamaño)
Returns:
(x, y, w, h): coordenadas del ROI seleccionado
"""
print(f"Abriendo video: {input_video_path}")
# Abrir video
cap = cv2.VideoCapture(input_video_path)
if not cap.isOpened():
print(f"Error: No se pudo abrir el video")
return None
# Obtener propiedades del video
fps = cap.get(cv2.CAP_PROP_FPS)
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
print(f"Video info:")
print(f" Resolución: {width}x{height}")
print(f" FPS: {fps}")
print(f" Total frames: {total_frames}")
print(f" Duración: {total_frames / fps:.2f} segundos")
# Leer primer frame
ret, first_frame = cap.read()
if not ret:
print("Error: No se pudo leer el primer frame")
cap.release()
return None
# Redimensionar para visualización si es necesario
display_height, display_width = first_frame.shape[:2]
if display_scale != 1.0:
display_width = int(width * display_scale)
display_height = int(height * display_scale)
frame_display = cv2.resize(first_frame, (display_width, display_height))
else:
frame_display = first_frame.copy()
print(f"\nSelecciona el ROI del tubo con el mouse")
print("Presiona ENTER cuando termines, ESC para cancelar")
# Seleccionar ROI
roi = cv2.selectROI("Seleccionar ROI - Presiona ENTER cuando termines", frame_display, fromCenter=False)
cv2.destroyAllWindows()
if roi[2] == 0 or roi[3] == 0:
print("Selección cancelada")
cap.release()
return None
# Escalar coordenadas al tamaño original si se redimensionó
if display_scale != 1.0:
x = int(roi[0] / display_scale)
y = int(roi[1] / display_scale)
w = int(roi[2] / display_scale)
h = int(roi[3] / display_scale)
else:
x, y, w, h = roi
print(f"\nROI seleccionado (tamaño original):")
print(f" x={x}, y={y}, width={w}, height={h}")
# Mostrar preview del ROI
preview = first_frame[y:y + h, x:x + w]
if preview.shape[0] > 800:
preview_scale = 800 / preview.shape[0]
preview_w = int(preview.shape[1] * preview_scale)
preview_h = int(preview.shape[0] * preview_scale)
preview_display = cv2.resize(preview, (preview_w, preview_h))
else:
preview_display = preview
cv2.imshow("Preview ROI - Presiona cualquier tecla para continuar", preview_display)
cv2.waitKey(0)
cv2.destroyAllWindows()
# Confirmar
respuesta = input(f"\n¿Proceder a recortar el video completo? (s/n): ")
if respuesta.lower() != 's':
print("Operación cancelada")
cap.release()
return None
# Crear directorio de salida si no existe
Path(os.path.dirname(output_video_path)).mkdir(parents=True, exist_ok=True)
# Crear video writer para video recortado
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
out = cv2.VideoWriter(output_video_path, fourcc, fps, (w, h))
# Reiniciar captura al inicio
cap.set(cv2.CAP_PROP_POS_FRAMES, 0)
print(f"\nRecortando video...")
print(f"De: {width}x{height} → A: {w}x{h}")
frame_count = 0
while True:
ret, frame = cap.read()
if not ret:
break
# Recortar frame
cropped = frame[y:y + h, x:x + w]
# Escribir al video de salida
out.write(cropped)
frame_count += 1
if frame_count % 100 == 0:
print(f" Procesados {frame_count}/{total_frames} frames ({frame_count / total_frames * 100:.1f}%)")
# Liberar recursos
cap.release()
out.release()
# Mostrar info del archivo resultante
original_size = os.path.getsize(input_video_path) / (1024 ** 3) # GB
cropped_size = os.path.getsize(output_video_path) / (1024 ** 3) # GB
print(f"\n✓ Video recortado guardado: {output_video_path}")
print(f" Frames procesados: {frame_count}")
print(f" Tamaño original: {original_size:.2f} GB")
print(f" Tamaño recortado: {cropped_size:.2f} GB")
print(f" Reducción: {(1 - cropped_size / original_size) * 100:.1f}%")
return (x, y, w, h)
# ==================== FUNCIONES DE VIDEO Y FRAMES =====================================================================
def extract_frames_from_video_xx(video_path, output_dir, time_interval_seconds):
"""
Extrae TODOS los frames de un video y los guarda
Args:
video_path: ruta al video
output_dir: carpeta donde guardar frames
time_interval_seconds: intervalo real entre capturas (para metadata)
Returns:
int: número de frames extraídos
"""
Path(output_dir).mkdir(parents=True, exist_ok=True)
cap = cv2.VideoCapture(video_path)
fps = cap.get(cv2.CAP_PROP_FPS)
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
print(f"\nExtrayendo frames de: {os.path.basename(video_path)}")
print(f" FPS: {fps}")
print(f" Total frames: {total_frames}")
print(f" Tiempo real total: {total_frames * time_interval_seconds / 60:.1f} minutos")
frame_count = 0
while True:
ret, frame = cap.read()
if not ret:
break
real_time_seconds = frame_count * time_interval_seconds
filename = f"frame_{frame_count:04d}_t{real_time_seconds}s.jpg"
output_path = os.path.join(output_dir, filename)
cv2.imwrite(output_path, frame)
frame_count += 1
cap.release()
print(f"✓ {frame_count} frames extraídos en: {output_dir}")
return frame_count
def extract_frames_from_video(video_path, output_dir, time_interval_seconds):
"""
Extrae TODOS los frames de un video y los guarda
Args:
video_path: ruta al video
output_dir: carpeta donde guardar frames
time_interval_seconds: intervalo real entre capturas (para metadata)
Returns:
tuple: (frames_list, frame_count) - lista de rutas de frames y número total
"""
Path(output_dir).mkdir(parents=True, exist_ok=True)
cap = cv2.VideoCapture(video_path)
fps = cap.get(cv2.CAP_PROP_FPS)
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
print(f"\nExtrayendo frames de: {os.path.basename(video_path)}")
print(f" FPS: {fps}")
print(f" Total frames: {total_frames}")
print(f" Tiempo real total: {total_frames * time_interval_seconds / 60:.1f} minutos")
frame_count = 0
frames_list = [] # ← AGREGAR ESTA LISTA
while True:
ret, frame = cap.read()
if not ret:
break
real_time_seconds = frame_count * time_interval_seconds
filename = f"frame_{frame_count:04d}_t{real_time_seconds}s.jpg"
output_path = os.path.join(output_dir, filename)
cv2.imwrite(output_path, frame)
frames_list.append(output_path) # ← AGREGAR A LA LISTA
frame_count += 1
cap.release()
print(f"✓ {frame_count} frames extraídos en: {output_dir}")
return frames_list, frame_count # ← RETORNAR AMBOS
# ==================== FUNCIONES DE MODELO =============================================================================
def load_unet_model(model_path, device):
"""Cargar modelo U-Net entrenado"""
model = smp.Unet(
encoder_name="resnet34",
encoder_weights="imagenet",
in_channels=3,
classes=1,
activation=None
)
checkpoint = torch.load(model_path, map_location=device)
model.load_state_dict(checkpoint['model_state_dict'])
model = model.to(device)
model.eval()
print(f"✓ Modelo cargado: {os.path.basename(model_path)}")
print(f" Época: {checkpoint.get('epoch', 'N/A')}")
print(f" Val Loss: {checkpoint.get('val_loss', 'N/A'):.4f}")
return model
def predict_interface_mask(model, image, device):
"""Predecir máscara de interfaz para una imagen"""
img = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
img = img.astype(np.float32) / 255.0
img_tensor = torch.from_numpy(img).permute(2, 0, 1).unsqueeze(0)
img_tensor = img_tensor.to(device)
with torch.no_grad():
output = model(img_tensor)
output = torch.sigmoid(output)
mask = output.squeeze().cpu().numpy()
mask_binary = (mask > 0.5).astype(np.uint8) * 255
return mask_binary
def extract_interface_points(mask_binary):
"""Extraer puntos (x,y) de la línea de interfaz desde máscara binaria"""
height, width = mask_binary.shape
interface_points = []
for x in range(width):
column = mask_binary[:, x]
white_pixels = np.where(column == 255)[0]
if len(white_pixels) > 0:
y_interface = white_pixels[0]
interface_points.append((x, y_interface))
return interface_points
def draw_interface_overlay(image, interface_points, show_text=True):
"""
Dibujar overlay de interfaz en imagen
- Línea verde: interfaz completa
- Línea azul: promedio horizontal
Returns:
(img_result, avg_height): imagen con overlay y altura promedio
"""
img_result = image.copy()
if len(interface_points) == 0:
return img_result, None
# Línea verde (interfaz completa)
pts = np.array(interface_points, dtype=np.int32)
cv2.polylines(img_result, [pts], isClosed=False, color=(0, 255, 0), thickness=2)
# Calcular altura promedio
avg_y = int(np.mean([p[1] for p in interface_points]))
height_from_bottom = image.shape[0] - avg_y
# Línea azul horizontal (promedio)
cv2.line(img_result, (0, avg_y), (image.shape[1], avg_y), color=(255, 0, 0), thickness=2)
# Texto opcional
if show_text:
text = f"Altura: {height_from_bottom:.1f} px"
cv2.putText(img_result, text, (10, 30), cv2.FONT_HERSHEY_SIMPLEX,
0.7, (255, 255, 255), 2)
return img_result, height_from_bottom
# ==================== PROCESAMIENTO COMPLETO ====================
def process_video_complete(model, frames_dir, output_frames_dir, output_video_path,
output_csv_path, time_interval, fps=30, device='cuda'):
"""
Procesamiento completo: frames → detección → video + CSV
Args:
model: modelo U-Net cargado
frames_dir: carpeta con frames originales
output_frames_dir: carpeta para guardar frames procesados
output_video_path: ruta del video final
output_csv_path: ruta del CSV de resultados
time_interval: segundos entre frames originales
fps: fps del video de salida
device: 'cuda' o 'cpu'
Returns:
DataFrame con resultados
"""
Path(output_frames_dir).mkdir(parents=True, exist_ok=True)
# Obtener frames
all_frames = sorted([f for f in os.listdir(frames_dir) if f.endswith('.jpg')])
print(f"\n{'=' * 60}")
print(f"PROCESANDO {len(all_frames)} FRAMES")
print(f"{'=' * 60}\n")
results = []
# Procesar cada frame
for i, frame_name in enumerate(tqdm(all_frames, desc="Procesando frames")):
frame_path = os.path.join(frames_dir, frame_name)
image = cv2.imread(frame_path)
# Predecir interfaz
mask_binary = predict_interface_mask(model, image, device)
interface_points = extract_interface_points(mask_binary)
# Dibujar overlay
img_result, height_px = draw_interface_overlay(image, interface_points)
# Guardar frame procesado
output_path = os.path.join(output_frames_dir, frame_name)
cv2.imwrite(output_path, img_result)
# Guardar resultados
time_seconds = i * time_interval
results.append({
'frame_number': i,
'frame_name': frame_name,
'time_seconds': time_seconds,
'time_minutes': time_seconds / 60,
'height_pixels': height_px,
})
# Crear DataFrame
df = pd.DataFrame(results)
# Guardar CSV
df.to_csv(output_csv_path, index=False)
print(f"\n✓ CSV guardado: {output_csv_path}")
# Crear video
print(f"\nGenerando video...")
frames = sorted([f for f in os.listdir(output_frames_dir) if f.endswith('.jpg')])
first_frame = cv2.imread(os.path.join(output_frames_dir, frames[0]))
height, width, _ = first_frame.shape
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
video = cv2.VideoWriter(output_video_path, fourcc, fps, (width, height))
for frame_name in tqdm(frames, desc="Creando video"):
frame = cv2.imread(os.path.join(output_frames_dir, frame_name))
video.write(frame)
video.release()
video_size = os.path.getsize(output_video_path) / (1024 ** 2)
print(f"✓ Video guardado: {output_video_path} ({video_size:.1f} MB)")
return df
def plot_sedimentation_curve_xx(csv_path, output_plot_path=None):
"""Generar gráfica de curva de sedimentación desde CSV"""
import matplotlib.pyplot as plt
df = pd.read_csv(csv_path)
fig, ax = plt.subplots(figsize=(12, 6))
ax.plot(df['time_minutes'], df['height_pixels'], 'b-', linewidth=2, label='Altura interfaz')
ax.scatter(df['time_minutes'], df['height_pixels'], c='red', s=20, alpha=0.5)
ax.set_xlabel('Tiempo (minutos)', fontsize=12, fontweight='bold')
ax.set_ylabel('Altura interfaz (píxeles)', fontsize=12, fontweight='bold')
ax.set_title('Curva de Sedimentación', fontsize=14, fontweight='bold')
ax.grid(True, alpha=0.3)
ax.legend()
plt.tight_layout()
if output_plot_path:
plt.savefig(output_plot_path, dpi=150)
print(f"✓ Gráfica guardada: {output_plot_path}")
plt.show()
return fig
def plot_sedimentation_curve(data, output_plot_path=None):
"""
Generar gráfica de curva de sedimentación
Args:
data: DataFrame de pandas o ruta a archivo CSV
output_plot_path: opcional, ruta para guardar la imagen
"""
import matplotlib.pyplot as plt
# Si es string, cargar CSV; si es DataFrame, usar directamente
if isinstance(data, str):
df = pd.read_csv(data)
else:
df = data
fig, ax = plt.subplots(figsize=(12, 6))
ax.plot(df['time_minutes'], df['height_pixels'], 'b-', linewidth=2, label='Altura interfaz')
ax.scatter(df['time_minutes'], df['height_pixels'], c='red', s=20, alpha=0.5)
ax.set_xlabel('Tiempo (minutos)', fontsize=12, fontweight='bold')
ax.set_ylabel('Altura interfaz (píxeles)', fontsize=12, fontweight='bold')
ax.set_title('Curva de Sedimentación', fontsize=14, fontweight='bold')
ax.grid(True, alpha=0.3)
ax.legend()
plt.tight_layout()
if output_plot_path:
plt.savefig(output_plot_path, dpi=150)
print(f"✓ Gráfica guardada: {output_plot_path}")
return fig