| """ |
| Character Detection Module |
| Integra el trabajo de Ana para detección de personajes mediante: |
| 1. Extracción de caras y embeddings |
| 2. Extracción de voces y embeddings |
| 3. Clustering con DBSCAN |
| 4. Generación de carpetas por personaje |
| """ |
| import cv2 |
| import os |
| import json |
| import logging |
| import shutil |
| from pathlib import Path |
| from sklearn.cluster import DBSCAN |
| import numpy as np |
| from typing import List, Dict, Any, Tuple |
|
|
| |
| try: |
| |
| from deepface import DeepFace |
| DEEPFACE_AVAILABLE = True |
| except Exception as e: |
| DEEPFACE_AVAILABLE = False |
| logging.warning(f"DeepFace no disponible: {e}") |
|
|
| logging.basicConfig(level=logging.INFO) |
| logger = logging.getLogger(__name__) |
|
|
|
|
| class CharacterDetector: |
| """ |
| Detector de personajes que integra el trabajo de Ana. |
| """ |
| |
| def __init__(self, video_path: str, output_base: Path, video_name: str = None): |
| """ |
| Args: |
| video_path: Ruta al archivo de vídeo |
| output_base: Directorio base para guardar resultados (ej: /tmp/temp/video_name) |
| video_name: Nombre del vídeo (para construir URLs) |
| """ |
| self.video_path = video_path |
| self.output_base = Path(output_base) |
| self.output_base.mkdir(parents=True, exist_ok=True) |
| self.video_name = video_name or self.output_base.name |
| |
| |
| self.faces_dir = self.output_base / "faces" |
| self.voices_dir = self.output_base / "voices" |
| self.scenes_dir = self.output_base / "scenes" |
| |
| for d in [self.faces_dir, self.voices_dir, self.scenes_dir]: |
| d.mkdir(parents=True, exist_ok=True) |
| |
| def extract_faces_embeddings(self, *, start_offset_sec: float = 3.0, extract_every_sec: float = 0.5, |
| detector_backend: str = 'retinaface', min_face_area: int = 100, |
| enforce_detection: bool = False) -> List[Dict[str, Any]]: |
| """ |
| Extrae caras del vídeo y calcula sus embeddings usando DeepFace directamente. |
| |
| Returns: |
| Lista de dicts con {"embeddings": [...], "path": "..."} |
| """ |
| if not DEEPFACE_AVAILABLE: |
| logger.warning("DeepFace no disponible, retornando lista vacía") |
| return [] |
| |
| logger.info("Extrayendo caras del vídeo con DeepFace...") |
| |
| extract_every = float(extract_every_sec) |
| video = cv2.VideoCapture(self.video_path) |
| fps = int(video.get(cv2.CAP_PROP_FPS)) |
| total_frames = int(video.get(cv2.CAP_PROP_FRAME_COUNT)) |
| frame_interval = int(fps * extract_every) |
| frame_count = 0 |
| saved_count = 0 |
| start_frame = int(max(0.0, start_offset_sec) * (fps if fps > 0 else 25)) |
| |
| embeddings_caras = [] |
| |
| logger.info(f"Total frames: {total_frames}, FPS: {fps}, Procesando cada {frame_interval} frames") |
| |
| while True: |
| ret, frame = video.read() |
| if not ret: |
| break |
| |
| if frame_count < start_frame: |
| frame_count += 1 |
| continue |
|
|
| if frame_count % frame_interval == 0: |
| temp_path = self.faces_dir / "temp_frame.jpg" |
| cv2.imwrite(str(temp_path), frame) |
| |
| try: |
| |
| |
| face_objs = DeepFace.represent( |
| img_path=str(temp_path), |
| model_name='Facenet512', |
| detector_backend=detector_backend, |
| enforce_detection=enforce_detection |
| ) |
| |
| if face_objs: |
| for i, face_obj in enumerate(face_objs): |
| embedding = face_obj['embedding'] |
| facial_area = face_obj.get('facial_area', {}) |
| try: |
| w = int(facial_area.get('w', 0)) |
| h = int(facial_area.get('h', 0)) |
| if w * h < int(min_face_area): |
| continue |
| except Exception: |
| pass |
| |
| |
| x = int(facial_area.get('x', 0)); y = int(facial_area.get('y', 0)) |
| w = int(facial_area.get('w', 0)); h = int(facial_area.get('h', 0)) |
| x2 = max(0, x); y2 = max(0, y) |
| x3 = min(frame.shape[1], x + w); y3 = min(frame.shape[0], y + h) |
| crop = frame[y2:y3, x2:x3] if (x3 > x2 and y3 > y2) else frame |
| save_path = self.faces_dir / f"face_{saved_count:04d}.jpg" |
| cv2.imwrite(str(save_path), crop) |
| |
| embeddings_caras.append({ |
| "embeddings": embedding, |
| "path": str(save_path), |
| "frame": frame_count, |
| "facial_area": facial_area |
| }) |
| saved_count += 1 |
| |
| if frame_count % (frame_interval * 10) == 0: |
| logger.info(f"Progreso: frame {frame_count}/{total_frames}, caras detectadas: {saved_count}") |
| |
| except Exception as e: |
| logger.debug(f"No se detectaron caras en frame {frame_count}: {e}") |
| |
| if temp_path.exists(): |
| os.remove(temp_path) |
| |
| frame_count += 1 |
| |
| video.release() |
| logger.info(f"✓ Caras extraídas: {len(embeddings_caras)}") |
| return embeddings_caras |
| |
| def extract_voices_embeddings(self) -> List[Dict[str, Any]]: |
| """ |
| Extrae voces del vídeo y calcula sus embeddings. |
| Por ahora retorna lista vacía (funcionalidad opcional). |
| |
| Returns: |
| Lista de dicts con {"embeddings": [...], "path": "..."} |
| """ |
| logger.info("Extracción de voces deshabilitada temporalmente") |
| return [] |
| |
| def extract_scenes_embeddings(self) -> List[Dict[str, Any]]: |
| """ |
| Extrae escenas clave del vídeo. |
| Por ahora retorna lista vacía (funcionalidad opcional). |
| |
| Returns: |
| Lista de dicts con {"embeddings": [...], "path": "..."} |
| """ |
| logger.info("Extracción de escenas deshabilitada temporalmente") |
| return [] |
| |
| def cluster_faces(self, embeddings_caras: List[Dict], epsilon: float, min_samples: int) -> np.ndarray: |
| """ |
| Agrupa caras similares usando DBSCAN. |
| Basado en get_face_clusters de Ana. |
| |
| Args: |
| embeddings_caras: Lista de embeddings de caras |
| epsilon: Parámetro eps de DBSCAN |
| min_samples: Parámetro min_samples de DBSCAN |
| |
| Returns: |
| Array de labels (cluster asignado a cada cara) |
| """ |
| if not embeddings_caras: |
| return np.array([]) |
| |
| logger.info(f"Clustering {len(embeddings_caras)} caras con eps={epsilon}, min_samples={min_samples}") |
| |
| |
| X = np.array([cara['embeddings'] for cara in embeddings_caras]) |
| |
| |
| clustering = DBSCAN(eps=epsilon, min_samples=min_samples, metric='euclidean').fit(X) |
| labels = clustering.labels_ |
| |
| |
| n_clusters = len(set(labels)) - (1 if -1 in labels else 0) |
| n_noise = list(labels).count(-1) |
| |
| logger.info(f"Clusters encontrados: {n_clusters}, Ruido: {n_noise}") |
| return labels |
| |
| def create_character_folders(self, embeddings_caras: List[Dict], labels: np.ndarray) -> List[Dict[str, Any]]: |
| """ |
| Crea carpetas para cada personaje detectado y guarda las caras. |
| |
| Args: |
| embeddings_caras: Lista de embeddings de caras |
| labels: Array de labels de clustering |
| |
| Returns: |
| Lista de personajes detectados con metadata |
| """ |
| characters = [] |
| |
| |
| clusters = {} |
| for idx, label in enumerate(labels): |
| if label == -1: |
| continue |
| if label not in clusters: |
| clusters[label] = [] |
| clusters[label].append(idx) |
| |
| logger.info(f"Creando carpetas para {len(clusters)} personajes...") |
| |
| |
| for cluster_id, face_indices in clusters.items(): |
| char_id = f"char{cluster_id + 1}" |
| char_dir = self.output_base / char_id |
| char_dir.mkdir(parents=True, exist_ok=True) |
| |
| |
| for i, face_idx in enumerate(face_indices): |
| src_path = Path(embeddings_caras[face_idx]['path']) |
| dst_path = char_dir / f"face_{i:03d}.jpg" |
| if src_path.exists(): |
| shutil.copy(src_path, dst_path) |
| |
| |
| if face_indices: |
| representative_src = Path(embeddings_caras[face_indices[0]]['path']) |
| representative_dst = char_dir / "representative.jpg" |
| if representative_src.exists(): |
| shutil.copy(representative_src, representative_dst) |
| |
| |
| |
| image_url = f"/files/{self.video_name}/{char_id}/representative.jpg" |
| |
| characters.append({ |
| "id": char_id, |
| "name": f"Personatge {cluster_id + 1}", |
| "image_path": str(char_dir / "representative.jpg"), |
| "image_url": image_url, |
| "num_faces": len(face_indices), |
| "folder": str(char_dir) |
| }) |
| |
| logger.info(f"Carpetas creadas para {len(characters)} personajes") |
| return characters |
| |
| def save_analysis_json(self, embeddings_caras: List[Dict], embeddings_voices: List[Dict], |
| embeddings_escenas: List[Dict]) -> Path: |
| """ |
| Guarda el análisis completo en un archivo JSON. |
| Similar al analysis.json de Ana. |
| |
| Returns: |
| Path al archivo JSON guardado |
| """ |
| analysis_data = { |
| "caras": embeddings_caras, |
| "voices": embeddings_voices, |
| "escenas": embeddings_escenas |
| } |
| |
| analysis_path = self.output_base / "analysis.json" |
| |
| try: |
| with open(analysis_path, "w", encoding="utf-8") as f: |
| json.dump(analysis_data, f, indent=2, ensure_ascii=False) |
| logger.info(f"Analysis JSON guardado: {analysis_path}") |
| except Exception as e: |
| logger.warning(f"Error al guardar analysis JSON: {e}") |
| |
| return analysis_path |
| |
| def detect_characters(self, epsilon: float = 0.5, min_cluster_size: int = 2, |
| *, start_offset_sec: float = 3.0, extract_every_sec: float = 0.5) -> Tuple[List[Dict], Path, np.ndarray, List[Dict[str, Any]]]: |
| """ |
| Pipeline completo de detección de personajes. |
| |
| Args: |
| epsilon: Parámetro epsilon para DBSCAN |
| min_cluster_size: Tamaño mínimo de cluster |
| |
| Returns: |
| Tuple de (lista de personajes, path al analysis.json) |
| """ |
| |
| embeddings_caras = self.extract_faces_embeddings(start_offset_sec=start_offset_sec, extract_every_sec=extract_every_sec) |
| |
| |
| embeddings_voices = self.extract_voices_embeddings() |
| |
| |
| embeddings_escenas = self.extract_scenes_embeddings() |
| |
| |
| analysis_path = self.save_analysis_json(embeddings_caras, embeddings_voices, embeddings_escenas) |
| |
| |
| labels = self.cluster_faces(embeddings_caras, epsilon, min_cluster_size) |
| |
| |
| characters = self.create_character_folders(embeddings_caras, labels) |
| |
| return characters, analysis_path, labels, embeddings_caras |
|
|
|
|
| |
| def detect_characters_from_video(video_path: str, output_base: str, |
| epsilon: float = 0.5, min_cluster_size: int = 2, |
| video_name: str = None, |
| *, start_offset_sec: float = 3.0, extract_every_sec: float = 0.5) -> Dict[str, Any]: |
| """ |
| Función de alto nivel para detectar personajes en un vídeo. |
| |
| Args: |
| video_path: Ruta al vídeo |
| output_base: Directorio base para guardar resultados |
| epsilon: Parámetro epsilon para DBSCAN |
| min_cluster_size: Tamaño mínimo de cluster |
| video_name: Nombre del vídeo (para construir URLs) |
| |
| Returns: |
| Dict con resultados: {"characters": [...], "analysis_path": "..."} |
| """ |
| detector = CharacterDetector(video_path, Path(output_base), video_name=video_name) |
| characters, analysis_path, labels, embeddings_caras = detector.detect_characters(epsilon, min_cluster_size, |
| start_offset_sec=start_offset_sec, |
| extract_every_sec=extract_every_sec) |
| |
| return { |
| "characters": characters, |
| "analysis_path": str(analysis_path), |
| "num_characters": len(characters), |
| "face_labels": labels.tolist() if isinstance(labels, np.ndarray) else list(labels), |
| "num_face_embeddings": len(embeddings_caras) |
| } |
|
|