Upload 2 files
Browse files- api.py +64 -10
- character_detection.py +32 -4
api.py
CHANGED
|
@@ -51,8 +51,12 @@ jobs: Dict[str, dict] = {}
|
|
| 51 |
|
| 52 |
def normalize_face_lighting(image):
|
| 53 |
"""
|
| 54 |
-
Normaliza el brillo de una imagen de cara usando
|
| 55 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
| 56 |
|
| 57 |
Args:
|
| 58 |
image: Imagen BGR (OpenCV format)
|
|
@@ -61,13 +65,28 @@ def normalize_face_lighting(image):
|
|
| 61 |
Imagen normalizada en el mismo formato
|
| 62 |
"""
|
| 63 |
import cv2
|
| 64 |
-
|
|
|
|
|
|
|
| 65 |
lab = cv2.cvtColor(image, cv2.COLOR_BGR2LAB)
|
| 66 |
l, a, b = cv2.split(lab)
|
| 67 |
|
| 68 |
-
# Aplicar CLAHE (Contrast Limited Adaptive Histogram Equalization) al canal L
|
| 69 |
-
|
| 70 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 71 |
|
| 72 |
# Recombinar canales
|
| 73 |
lab_normalized = cv2.merge([l_normalized, a, b])
|
|
@@ -78,7 +97,8 @@ def normalize_face_lighting(image):
|
|
| 78 |
|
| 79 |
def hierarchical_cluster_with_min_size(X, max_groups: int, min_cluster_size: int):
|
| 80 |
"""
|
| 81 |
-
Clustering jerárquico aglomerativo
|
|
|
|
| 82 |
Filtra clusters con menos de min_cluster_size muestras (marcados como -1/ruido).
|
| 83 |
|
| 84 |
Args:
|
|
@@ -91,6 +111,7 @@ def hierarchical_cluster_with_min_size(X, max_groups: int, min_cluster_size: int
|
|
| 91 |
"""
|
| 92 |
import numpy as np
|
| 93 |
from scipy.cluster.hierarchy import linkage, fcluster
|
|
|
|
| 94 |
from collections import Counter
|
| 95 |
|
| 96 |
if len(X) == 0:
|
|
@@ -103,8 +124,37 @@ def hierarchical_cluster_with_min_size(X, max_groups: int, min_cluster_size: int
|
|
| 103 |
# Linkage usando distancia euclidiana con método 'ward'
|
| 104 |
Z = linkage(X, method='ward', metric='euclidean')
|
| 105 |
|
| 106 |
-
#
|
| 107 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 108 |
|
| 109 |
# fcluster devuelve labels 1-indexed, convertir a 0-indexed
|
| 110 |
labels = labels - 1
|
|
@@ -258,7 +308,11 @@ def process_video_job(job_id: str):
|
|
| 258 |
# Detección de caras y embeddings (CPU), alineado con 'originales'
|
| 259 |
try:
|
| 260 |
print(f"[{job_id}] Iniciando detección de personajes (CPU, originales)...")
|
| 261 |
-
print(f"[{job_id}] Normalización de brillo
|
|
|
|
|
|
|
|
|
|
|
|
|
| 262 |
import cv2
|
| 263 |
import numpy as np
|
| 264 |
try:
|
|
|
|
| 51 |
|
| 52 |
def normalize_face_lighting(image):
|
| 53 |
"""
|
| 54 |
+
Normaliza el brillo de una imagen de cara usando técnicas combinadas:
|
| 55 |
+
1. CLAHE para ecualización adaptativa
|
| 56 |
+
2. Normalización de rango para homogeneizar brillo general
|
| 57 |
+
|
| 58 |
+
Esto reduce el impacto de diferentes condiciones de iluminación en los embeddings
|
| 59 |
+
y en la visualización de las imágenes.
|
| 60 |
|
| 61 |
Args:
|
| 62 |
image: Imagen BGR (OpenCV format)
|
|
|
|
| 65 |
Imagen normalizada en el mismo formato
|
| 66 |
"""
|
| 67 |
import cv2
|
| 68 |
+
import numpy as np
|
| 69 |
+
|
| 70 |
+
# Paso 1: Convertir a LAB color space (más robusto para iluminación)
|
| 71 |
lab = cv2.cvtColor(image, cv2.COLOR_BGR2LAB)
|
| 72 |
l, a, b = cv2.split(lab)
|
| 73 |
|
| 74 |
+
# Paso 2: Aplicar CLAHE (Contrast Limited Adaptive Histogram Equalization) al canal L
|
| 75 |
+
# Usar clipLimit más alto para normalización más agresiva
|
| 76 |
+
clahe = cv2.createCLAHE(clipLimit=3.0, tileGridSize=(8, 8))
|
| 77 |
+
l_clahe = clahe.apply(l)
|
| 78 |
+
|
| 79 |
+
# Paso 3: Normalizar el rango del canal L para asegurar distribución uniforme
|
| 80 |
+
# Esto garantiza que todas las imágenes tengan un rango de brillo similar
|
| 81 |
+
l_min, l_max = l_clahe.min(), l_clahe.max()
|
| 82 |
+
if l_max > l_min:
|
| 83 |
+
# Estirar el histograma al rango completo [0, 255]
|
| 84 |
+
l_normalized = ((l_clahe - l_min) * 255.0 / (l_max - l_min)).astype(np.uint8)
|
| 85 |
+
else:
|
| 86 |
+
l_normalized = l_clahe
|
| 87 |
+
|
| 88 |
+
# Paso 4: Aplicar suavizado suave para reducir ruido introducido por la normalización
|
| 89 |
+
l_normalized = cv2.GaussianBlur(l_normalized, (3, 3), 0)
|
| 90 |
|
| 91 |
# Recombinar canales
|
| 92 |
lab_normalized = cv2.merge([l_normalized, a, b])
|
|
|
|
| 97 |
|
| 98 |
def hierarchical_cluster_with_min_size(X, max_groups: int, min_cluster_size: int):
|
| 99 |
"""
|
| 100 |
+
Clustering jerárquico aglomerativo con selección óptima del número de clusters.
|
| 101 |
+
Selecciona automáticamente el mejor número de clusters (hasta max_groups) usando silhouette score.
|
| 102 |
Filtra clusters con menos de min_cluster_size muestras (marcados como -1/ruido).
|
| 103 |
|
| 104 |
Args:
|
|
|
|
| 111 |
"""
|
| 112 |
import numpy as np
|
| 113 |
from scipy.cluster.hierarchy import linkage, fcluster
|
| 114 |
+
from sklearn.metrics import silhouette_score
|
| 115 |
from collections import Counter
|
| 116 |
|
| 117 |
if len(X) == 0:
|
|
|
|
| 124 |
# Linkage usando distancia euclidiana con método 'ward'
|
| 125 |
Z = linkage(X, method='ward', metric='euclidean')
|
| 126 |
|
| 127 |
+
# Encontrar el número óptimo de clusters usando silhouette score
|
| 128 |
+
best_n_clusters = 2
|
| 129 |
+
best_score = -1
|
| 130 |
+
|
| 131 |
+
# Probar diferentes números de clusters (de 2 a max_groups)
|
| 132 |
+
max_to_try = min(max_groups, len(X) - 1) # No puede haber más clusters que muestras
|
| 133 |
+
|
| 134 |
+
if max_to_try >= 2:
|
| 135 |
+
for n_clusters in range(2, max_to_try + 1):
|
| 136 |
+
trial_labels = fcluster(Z, t=n_clusters, criterion='maxclust') - 1
|
| 137 |
+
|
| 138 |
+
# Calcular cuántos clusters válidos tendríamos después del filtrado
|
| 139 |
+
trial_counts = Counter(trial_labels)
|
| 140 |
+
valid_clusters = sum(1 for count in trial_counts.values() if count >= min_cluster_size)
|
| 141 |
+
|
| 142 |
+
# Solo evaluar si hay al menos 2 clusters válidos
|
| 143 |
+
if valid_clusters >= 2:
|
| 144 |
+
try:
|
| 145 |
+
score = silhouette_score(X, trial_labels, metric='euclidean')
|
| 146 |
+
# Penalizar ligeramente configuraciones con muchos clusters para evitar overfitting
|
| 147 |
+
adjusted_score = score - (n_clusters * 0.01)
|
| 148 |
+
|
| 149 |
+
if adjusted_score > best_score:
|
| 150 |
+
best_score = adjusted_score
|
| 151 |
+
best_n_clusters = n_clusters
|
| 152 |
+
except:
|
| 153 |
+
pass # Si falla el cálculo, ignorar esta configuración
|
| 154 |
+
|
| 155 |
+
# Usar el número óptimo de clusters encontrado
|
| 156 |
+
print(f"Clustering óptimo: {best_n_clusters} clusters (de máximo {max_groups}), silhouette score: {best_score:.3f}")
|
| 157 |
+
labels = fcluster(Z, t=best_n_clusters, criterion='maxclust')
|
| 158 |
|
| 159 |
# fcluster devuelve labels 1-indexed, convertir a 0-indexed
|
| 160 |
labels = labels - 1
|
|
|
|
| 308 |
# Detección de caras y embeddings (CPU), alineado con 'originales'
|
| 309 |
try:
|
| 310 |
print(f"[{job_id}] Iniciando detección de personajes (CPU, originales)...")
|
| 311 |
+
print(f"[{job_id}] *** Normalización de brillo ACTIVADA ***")
|
| 312 |
+
print(f"[{job_id}] - CLAHE adaptativo (clipLimit=3.0)")
|
| 313 |
+
print(f"[{job_id}] - Estiramiento de histograma")
|
| 314 |
+
print(f"[{job_id}] - Suavizado Gaussiano")
|
| 315 |
+
print(f"[{job_id}] Esto homogeneizará el brillo de todas las caras detectadas")
|
| 316 |
import cv2
|
| 317 |
import numpy as np
|
| 318 |
try:
|
character_detection.py
CHANGED
|
@@ -174,7 +174,8 @@ class CharacterDetector:
|
|
| 174 |
|
| 175 |
def cluster_faces(self, embeddings_caras: List[Dict], max_groups: int, min_samples: int) -> np.ndarray:
|
| 176 |
"""
|
| 177 |
-
Agrupa caras similares usando clustering jerárquico aglomerativo.
|
|
|
|
| 178 |
|
| 179 |
Args:
|
| 180 |
embeddings_caras: Lista de embeddings de caras
|
|
@@ -196,9 +197,36 @@ class CharacterDetector:
|
|
| 196 |
# Si hay menos muestras que el mínimo, todo es ruido
|
| 197 |
return np.full(len(X), -1, dtype=int)
|
| 198 |
|
| 199 |
-
#
|
| 200 |
Z = linkage(X, method='ward', metric='euclidean')
|
| 201 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 202 |
|
| 203 |
# Filtrar clusters pequeños
|
| 204 |
label_counts = Counter(labels)
|
|
@@ -214,7 +242,7 @@ class CharacterDetector:
|
|
| 214 |
n_clusters = len(set(labels)) - (1 if -1 in labels else 0)
|
| 215 |
n_noise = list(labels).count(-1)
|
| 216 |
|
| 217 |
-
logger.info(f"Clusters encontrados: {n_clusters}, Ruido: {n_noise}")
|
| 218 |
return labels
|
| 219 |
|
| 220 |
def create_character_folders(self, embeddings_caras: List[Dict], labels: np.ndarray) -> List[Dict[str, Any]]:
|
|
|
|
| 174 |
|
| 175 |
def cluster_faces(self, embeddings_caras: List[Dict], max_groups: int, min_samples: int) -> np.ndarray:
|
| 176 |
"""
|
| 177 |
+
Agrupa caras similares usando clustering jerárquico aglomerativo con selección óptima.
|
| 178 |
+
Selecciona automáticamente el mejor número de clusters usando silhouette score.
|
| 179 |
|
| 180 |
Args:
|
| 181 |
embeddings_caras: Lista de embeddings de caras
|
|
|
|
| 197 |
# Si hay menos muestras que el mínimo, todo es ruido
|
| 198 |
return np.full(len(X), -1, dtype=int)
|
| 199 |
|
| 200 |
+
# Linkage usando distancia euclidiana con método 'ward'
|
| 201 |
Z = linkage(X, method='ward', metric='euclidean')
|
| 202 |
+
|
| 203 |
+
# Encontrar el número óptimo de clusters usando silhouette score
|
| 204 |
+
from sklearn.metrics import silhouette_score
|
| 205 |
+
best_n_clusters = 2
|
| 206 |
+
best_score = -1
|
| 207 |
+
|
| 208 |
+
max_to_try = min(max_groups, len(X) - 1)
|
| 209 |
+
|
| 210 |
+
if max_to_try >= 2:
|
| 211 |
+
for n_clusters in range(2, max_to_try + 1):
|
| 212 |
+
trial_labels = fcluster(Z, t=n_clusters, criterion='maxclust') - 1
|
| 213 |
+
|
| 214 |
+
trial_counts = Counter(trial_labels)
|
| 215 |
+
valid_clusters = sum(1 for count in trial_counts.values() if count >= min_samples)
|
| 216 |
+
|
| 217 |
+
if valid_clusters >= 2:
|
| 218 |
+
try:
|
| 219 |
+
score = silhouette_score(X, trial_labels, metric='euclidean')
|
| 220 |
+
adjusted_score = score - (n_clusters * 0.01)
|
| 221 |
+
|
| 222 |
+
if adjusted_score > best_score:
|
| 223 |
+
best_score = adjusted_score
|
| 224 |
+
best_n_clusters = n_clusters
|
| 225 |
+
except:
|
| 226 |
+
pass
|
| 227 |
+
|
| 228 |
+
logger.info(f"Clustering óptimo: {best_n_clusters} clusters (de máximo {max_groups}), silhouette: {best_score:.3f}")
|
| 229 |
+
labels = fcluster(Z, t=best_n_clusters, criterion='maxclust') - 1
|
| 230 |
|
| 231 |
# Filtrar clusters pequeños
|
| 232 |
label_counts = Counter(labels)
|
|
|
|
| 242 |
n_clusters = len(set(labels)) - (1 if -1 in labels else 0)
|
| 243 |
n_noise = list(labels).count(-1)
|
| 244 |
|
| 245 |
+
logger.info(f"Clusters válidos encontrados: {n_clusters}, Ruido: {n_noise}")
|
| 246 |
return labels
|
| 247 |
|
| 248 |
def create_character_folders(self, embeddings_caras: List[Dict], labels: np.ndarray) -> List[Dict[str, Any]]:
|