Spaces:
Sleeping
Sleeping
| """ | |
| Advanced emotion analysis service using DeepFace for EmotionMirror application. | |
| Provides high-precision facial emotion detection and analysis. | |
| """ | |
| import os | |
| import logging | |
| import tempfile | |
| import cv2 | |
| import numpy as np | |
| from typing import Dict, Any, List, Optional | |
| import traceback | |
| # Importar settings desde config | |
| from config import settings | |
| from services.emotion_service import EmotionService | |
| # Importar DeepFace con manejo de errores | |
| try: | |
| from deepface import DeepFace | |
| DEEPFACE_AVAILABLE = True | |
| logging.info("DeepFace está disponible. Se puede usar la detección avanzada de emociones.") | |
| except (ImportError, ModuleNotFoundError, ValueError) as e: | |
| DEEPFACE_AVAILABLE = False | |
| logging.warning(f"No se pudo importar DeepFace: {str(e)}. Se usará el servicio básico de emociones.") | |
| class DeepFaceEmotionService(EmotionService): | |
| """ | |
| Servicio para análisis avanzado de emociones usando DeepFace. | |
| Proporciona detección de emociones, edad y género. | |
| Si DeepFace falla, recurrirá al servicio básico de emociones. | |
| """ | |
| _instance = None | |
| def __new__(cls): | |
| if cls._instance is None: | |
| cls._instance = super(DeepFaceEmotionService, cls).__new__(cls) | |
| cls._instance._initialized = False | |
| return cls._instance | |
| def __init__(self): | |
| # Llamar al inicializador de la clase padre primero | |
| super().__init__() | |
| # Atributos de inicialización - siempre definidos | |
| self.deepface_initialized = False | |
| self.deepface_available = DEEPFACE_AVAILABLE | |
| self.actions = ['emotion', 'age', 'gender'] # Definir actions aquí para que siempre exista | |
| self.detector_backend = "opencv" # Valor por defecto | |
| # Definir emotion_mapping aquí para que siempre exista | |
| self.emotion_mapping = { | |
| 'angry': 'anger', | |
| 'disgust': 'disgust', | |
| 'fear': 'fear', | |
| 'happy': 'joy', | |
| 'sad': 'sadness', | |
| 'surprise': 'surprise', | |
| 'neutral': 'neutral' | |
| } | |
| # Evitar inicialización múltiple | |
| if getattr(self, '_initialized', False): | |
| return | |
| self._initialized = True | |
| # Crear directorio temporal si no existe | |
| if not os.path.exists(settings.TEMP_DIR): | |
| os.makedirs(settings.TEMP_DIR, exist_ok=True) | |
| # Verificar disponibilidad de DeepFace | |
| if not DEEPFACE_AVAILABLE: | |
| logging.warning("DeepFace no está disponible. Usando EmotionService básico como respaldo.") | |
| return | |
| try: | |
| # Verificar que DeepFace funciona correctamente | |
| logging.info("Inicializando DeepFace...") | |
| detector_backends = DeepFace.detector_backends() | |
| logging.info(f"DeepFace detector backends disponibles: {detector_backends}") | |
| # Actualizar la configuración de DeepFace si está disponible | |
| if "opencv" in detector_backends: | |
| self.detector_backend = "opencv" | |
| elif "retinaface" in detector_backends: | |
| self.detector_backend = "retinaface" | |
| # Marcar como inicializado correctamente | |
| self.deepface_initialized = True | |
| logging.info("DeepFace inicializado correctamente") | |
| except Exception as e: | |
| logging.error(f"Error al inicializar DeepFace: {str(e)}") | |
| logging.error(traceback.format_exc()) | |
| self.deepface_initialized = False | |
| def is_advanced_service_active(self) -> bool: | |
| """ | |
| Check if the advanced emotion service (DeepFace) is active and available. | |
| Returns: | |
| bool: True if advanced service is available and initialized | |
| """ | |
| return self.deepface_available and self.deepface_initialized | |
| def analyze_emotion(self, face_img: np.ndarray) -> Dict[str, Any]: | |
| """ | |
| Analiza las emociones usando DeepFace. | |
| Si ocurre un error, recurre al servicio básico de emociones. | |
| Args: | |
| face_img: Imagen de la cara a analizar | |
| Returns: | |
| Diccionario con resultados de análisis de emociones, edad y género | |
| """ | |
| # Si DeepFace no está disponible, usar el servicio básico | |
| if not DEEPFACE_AVAILABLE: | |
| logging.info("DeepFace no está disponible. Usando servicio básico.") | |
| return super().analyze_emotion(face_img) | |
| # Guardar la imagen temporalmente | |
| temp_face_path = os.path.join(settings.TEMP_DIR, "temp_face.jpg") | |
| try: | |
| # Guardar imagen para análisis | |
| cv2.imwrite(temp_face_path, face_img) | |
| # Intentar análisis con DeepFace | |
| try: | |
| # Realizar análisis | |
| analysis_result = DeepFace.analyze( | |
| img_path=temp_face_path, | |
| actions=self.actions, | |
| detector_backend=self.detector_backend, | |
| enforce_detection=False # Continuar incluso si no se detecta cara | |
| ) | |
| # DeepFace returns a list for batch processing, get first result | |
| if isinstance(analysis_result, list): | |
| analysis_result = analysis_result[0] | |
| # Extract emotion data | |
| emotions_raw = analysis_result.get('emotion', {}) | |
| # Map emotions to our standard set | |
| emotions = {} | |
| for df_emotion, score in emotions_raw.items(): | |
| standard_emotion = self.emotion_mapping.get(df_emotion.lower(), df_emotion.lower()) | |
| emotions[standard_emotion] = score / 100.0 # DeepFace scores are 0-100 | |
| # Add any missing standard emotions with zero scores | |
| for emotion in settings.EMOTIONS: | |
| if emotion not in emotions: | |
| emotions[emotion] = 0.0 | |
| # Find primary emotion (highest score) | |
| primary_emotion = max(emotions.items(), key=lambda x: x[1]) | |
| # Calculate basic image features (for compatibility with existing code) | |
| gray_face = cv2.cvtColor(face_img, cv2.COLOR_BGR2GRAY) | |
| brightness = np.mean(gray_face) | |
| contrast = np.std(gray_face) | |
| # Extract additional features | |
| age = analysis_result.get('age', 0) | |
| gender = analysis_result.get('gender', 'unknown') | |
| # For compatibility with existing code | |
| h, w = gray_face.shape | |
| mid = w // 2 | |
| left_half = gray_face[:, :mid] | |
| right_half = cv2.flip(gray_face[:, mid:], 1) | |
| # Calculate symmetry (if sizes match) | |
| if left_half.shape == right_half.shape: | |
| symmetry = 1.0 - (np.sum(cv2.absdiff(left_half, right_half)) / (255 * left_half.size)) | |
| else: | |
| symmetry = 0.5 # Default value if face isn't properly centered | |
| # Formato del resultado | |
| result = { | |
| "emotion": primary_emotion[0], | |
| "confidence": primary_emotion[1], # Añadir la clave 'confidence' con el valor de confianza | |
| "emotions": emotions, # Usar 'emotions' en lugar de 'emotion_scores' | |
| "is_advanced_detection": True, | |
| "features": { | |
| 'brightness': float(brightness), | |
| 'contrast': float(contrast), | |
| 'symmetry': float(symmetry), | |
| 'smile_score': float(emotions.get('joy', 0.0)), # Use joy as smile score | |
| 'age': float(age), | |
| 'gender': gender | |
| } | |
| } | |
| return result | |
| except Exception as e: | |
| # Log del error y caída al servicio básico | |
| logging.error(f"Error en DeepFace.analyze: {str(e)}. Usando servicio básico.") | |
| return super().analyze_emotion(face_img) | |
| except Exception as e: | |
| # Error al guardar o procesar la imagen | |
| logging.error(f"Error al procesar la imagen para DeepFace: {str(e)}") | |
| return super().analyze_emotion(face_img) | |
| finally: | |
| # Asegurar limpieza de archivos temporales | |
| if os.path.exists(temp_face_path): | |
| try: | |
| os.remove(temp_face_path) | |
| except Exception as e: | |
| logging.warning(f"No se pudo eliminar el archivo temporal: {str(e)}") | |
| def get_advanced_attributes(self, face_img: np.ndarray) -> Dict[str, Any]: | |
| """ | |
| Get advanced facial attributes using DeepFace. | |
| Args: | |
| face_img: Face image to analyze | |
| Returns: | |
| Dictionary with advanced attributes (age, gender, race) | |
| """ | |
| if not DEEPFACE_AVAILABLE or face_img is None or face_img.size == 0: | |
| return { | |
| 'age': 0, | |
| 'gender': 'unknown', | |
| 'race': 'unknown' | |
| } | |
| try: | |
| # Save face image to a temporary file | |
| temp_face_path = os.path.join(settings.TEMP_DIR, "temp_face_attr.jpg") | |
| os.makedirs(settings.TEMP_DIR, exist_ok=True) | |
| cv2.imwrite(temp_face_path, face_img) | |
| # Analyze with DeepFace | |
| attr_result = DeepFace.analyze( | |
| img_path=temp_face_path, | |
| actions=['age', 'gender', 'race'], | |
| detector_backend=self.detector_backend, | |
| enforce_detection=False, | |
| ) | |
| # Extract first result if list | |
| if isinstance(attr_result, list): | |
| attr_result = attr_result[0] | |
| # Return attributes | |
| return { | |
| 'age': attr_result.get('age', 0), | |
| 'gender': attr_result.get('gender', 'unknown'), | |
| 'race': attr_result.get('dominant_race', 'unknown') | |
| } | |
| except Exception as e: | |
| logging.error(f"Error in advanced attribute detection: {e}") | |
| return { | |
| 'age': 0, | |
| 'gender': 'unknown', | |
| 'race': 'unknown' | |
| } | |
| finally: | |
| # Clean up temporary file | |
| try: | |
| if os.path.exists(temp_face_path): | |
| os.remove(temp_face_path) | |
| except: | |
| pass | |