Spaces:
Sleeping
Sleeping
| """ | |
| Inference pipeline for emotion recognition. | |
| """ | |
| import numpy as np | |
| from pathlib import Path | |
| from typing import Dict, List, Optional, Tuple, Union | |
| import cv2 | |
| from PIL import Image | |
| import tensorflow as tf | |
| from tensorflow.keras.models import Model | |
| import sys | |
| sys.path.append(str(Path(__file__).parent.parent.parent)) | |
| from src.config import ( | |
| IMAGE_SIZE, IMAGE_SIZE_TRANSFER, EMOTION_CLASSES, IDX_TO_EMOTION, | |
| INTENSITY_HIGH_THRESHOLD, INTENSITY_MEDIUM_THRESHOLD, | |
| CUSTOM_CNN_PATH, MOBILENET_PATH, VGG_PATH | |
| ) | |
| from src.preprocessing.face_detector import FaceDetector | |
| from src.models.model_utils import load_model | |
| class EmotionPredictor: | |
| """ | |
| Unified prediction interface for emotion recognition. | |
| """ | |
| def __init__( | |
| self, | |
| model_name: str = "custom_cnn", | |
| model_path: Optional[Path] = None, | |
| use_face_detection: bool = True | |
| ): | |
| """ | |
| Initialize the predictor. | |
| Args: | |
| model_name: Name of the model ('custom_cnn', 'mobilenet', 'vgg19') | |
| model_path: Optional custom model path | |
| use_face_detection: Whether to detect faces before prediction | |
| """ | |
| self.model_name = model_name | |
| self.model = None | |
| self.face_detector = FaceDetector() if use_face_detection else None | |
| # Determine model path | |
| if model_path: | |
| self.model_path = Path(model_path) | |
| else: | |
| paths = { | |
| "custom_cnn": CUSTOM_CNN_PATH, | |
| "mobilenet": MOBILENET_PATH, | |
| "vgg19": VGG_PATH | |
| } | |
| self.model_path = paths.get(model_name) | |
| # Set preprocessing based on model type | |
| self.is_transfer_model = model_name in ["mobilenet", "vgg19"] | |
| self.target_size = IMAGE_SIZE_TRANSFER if self.is_transfer_model else IMAGE_SIZE | |
| self.use_rgb = self.is_transfer_model | |
| def load(self) -> bool: | |
| """ | |
| Load the model. | |
| Returns: | |
| True if model loaded successfully | |
| """ | |
| try: | |
| if self.model_path and self.model_path.exists(): | |
| self.model = load_model(self.model_path) | |
| return True | |
| else: | |
| print(f"Model file not found: {self.model_path}") | |
| return False | |
| except Exception as e: | |
| print(f"Error loading model: {e}") | |
| return False | |
| def preprocess_image( | |
| self, | |
| image: np.ndarray, | |
| detect_face: bool = True | |
| ) -> Tuple[Optional[np.ndarray], List[dict]]: | |
| """ | |
| Preprocess an image for prediction. | |
| Args: | |
| image: Input image (BGR or RGB format) | |
| detect_face: Whether to detect and extract face | |
| Returns: | |
| Tuple of (preprocessed image, face info) | |
| """ | |
| faces_info = [] | |
| if detect_face and self.face_detector: | |
| # Detect and extract face | |
| face, faces_info = self.face_detector.detect_and_extract( | |
| image, | |
| target_size=self.target_size, | |
| to_grayscale=not self.use_rgb | |
| ) | |
| if face is None: | |
| return None, faces_info | |
| processed = face | |
| else: | |
| # Resize directly | |
| processed = cv2.resize(image, self.target_size) | |
| # Convert color if needed | |
| if self.use_rgb: | |
| if len(processed.shape) == 2: | |
| processed = cv2.cvtColor(processed, cv2.COLOR_GRAY2RGB) | |
| elif processed.shape[2] == 1: | |
| processed = np.repeat(processed, 3, axis=2) | |
| else: | |
| if len(processed.shape) == 3 and processed.shape[2] == 3: | |
| processed = cv2.cvtColor(processed, cv2.COLOR_BGR2GRAY) | |
| # Normalize | |
| processed = processed.astype(np.float32) / 255.0 | |
| # Add channel dimension if grayscale | |
| if len(processed.shape) == 2: | |
| processed = np.expand_dims(processed, axis=-1) | |
| # Add batch dimension | |
| processed = np.expand_dims(processed, axis=0) | |
| return processed, faces_info | |
| def predict( | |
| self, | |
| image: Union[np.ndarray, str, Path], | |
| detect_face: bool = True, | |
| return_all_scores: bool = True | |
| ) -> Dict: | |
| """ | |
| Predict emotion from an image. | |
| Args: | |
| image: Input image (array, file path, or PIL Image) | |
| detect_face: Whether to detect face first | |
| return_all_scores: Whether to return all class scores | |
| Returns: | |
| Prediction result dictionary | |
| """ | |
| if self.model is None: | |
| success = self.load() | |
| if not success: | |
| return {"error": "Model not loaded"} | |
| # Load image if path provided | |
| if isinstance(image, (str, Path)): | |
| image = cv2.imread(str(image)) | |
| if image is None: | |
| return {"error": f"Could not load image: {image}"} | |
| elif isinstance(image, Image.Image): | |
| image = cv2.cvtColor(np.array(image), cv2.COLOR_RGB2BGR) | |
| # Preprocess | |
| processed, faces_info = self.preprocess_image(image, detect_face) | |
| if processed is None: | |
| return { | |
| "error": "No face detected", | |
| "face_detected": False, | |
| "faces_info": faces_info | |
| } | |
| # Predict | |
| predictions = self.model.predict(processed, verbose=0) | |
| # Get top prediction | |
| pred_idx = int(np.argmax(predictions[0])) | |
| confidence = float(predictions[0][pred_idx]) | |
| emotion = IDX_TO_EMOTION[pred_idx] | |
| # Calculate intensity | |
| intensity = self._calculate_intensity(confidence) | |
| result = { | |
| "emotion": emotion, | |
| "confidence": confidence, | |
| "intensity": intensity, | |
| "face_detected": len(faces_info) > 0, | |
| "faces_info": faces_info, | |
| "model_used": self.model_name | |
| } | |
| if return_all_scores: | |
| result["all_probabilities"] = { | |
| EMOTION_CLASSES[i]: float(predictions[0][i]) | |
| for i in range(len(EMOTION_CLASSES)) | |
| } | |
| return result | |
| def predict_batch( | |
| self, | |
| images: List[Union[np.ndarray, str, Path]], | |
| detect_face: bool = True | |
| ) -> Dict: | |
| """ | |
| Predict emotions for multiple images. | |
| Args: | |
| images: List of images | |
| detect_face: Whether to detect faces | |
| Returns: | |
| Batch prediction results | |
| """ | |
| results = [] | |
| emotion_counts = {e: 0 for e in EMOTION_CLASSES} | |
| successful_predictions = 0 | |
| for i, image in enumerate(images): | |
| result = self.predict(image, detect_face) | |
| result["image_index"] = i | |
| results.append(result) | |
| if "error" not in result: | |
| emotion_counts[result["emotion"]] += 1 | |
| successful_predictions += 1 | |
| # Calculate distribution | |
| if successful_predictions > 0: | |
| emotion_distribution = { | |
| e: count / successful_predictions | |
| for e, count in emotion_counts.items() | |
| } | |
| else: | |
| emotion_distribution = {e: 0.0 for e in EMOTION_CLASSES} | |
| # Find dominant emotion | |
| dominant_emotion = max(emotion_counts.items(), key=lambda x: x[1]) | |
| return { | |
| "results": results, | |
| "summary": { | |
| "total_images": len(images), | |
| "successful_predictions": successful_predictions, | |
| "failed_predictions": len(images) - successful_predictions, | |
| "emotion_counts": emotion_counts, | |
| "emotion_distribution": emotion_distribution, | |
| "dominant_emotion": dominant_emotion[0], | |
| "dominant_emotion_count": dominant_emotion[1] | |
| }, | |
| "model_used": self.model_name | |
| } | |
| def _calculate_intensity(self, confidence: float) -> str: | |
| """ | |
| Calculate emotion intensity based on confidence. | |
| Args: | |
| confidence: Prediction confidence | |
| Returns: | |
| Intensity level ('high', 'medium', 'low') | |
| """ | |
| if confidence >= INTENSITY_HIGH_THRESHOLD: | |
| return "high" | |
| elif confidence >= INTENSITY_MEDIUM_THRESHOLD: | |
| return "medium" | |
| else: | |
| return "low" | |
| def visualize_prediction( | |
| self, | |
| image: np.ndarray, | |
| prediction: Dict | |
| ) -> np.ndarray: | |
| """ | |
| Visualize prediction on image. | |
| Args: | |
| image: Original image | |
| prediction: Prediction result | |
| Returns: | |
| Image with visualizations | |
| """ | |
| result = image.copy() | |
| if self.face_detector and prediction.get("faces_info"): | |
| # Draw face detection and emotion label | |
| result = self.face_detector.draw_detections( | |
| result, | |
| prediction["faces_info"], | |
| emotions=[prediction.get("emotion", "Unknown")], | |
| confidences=[prediction.get("confidence", 0)] | |
| ) | |
| return result | |
| def get_available_models() -> Dict[str, bool]: | |
| """ | |
| Get available trained models. | |
| Returns: | |
| Dictionary of model name -> availability | |
| """ | |
| return { | |
| "custom_cnn": CUSTOM_CNN_PATH.exists(), | |
| "mobilenet": MOBILENET_PATH.exists(), | |
| "vgg19": VGG_PATH.exists() | |
| } | |
| def create_predictor( | |
| model_name: str = "custom_cnn", | |
| auto_load: bool = True | |
| ) -> Optional[EmotionPredictor]: | |
| """ | |
| Factory function to create a predictor. | |
| Args: | |
| model_name: Name of the model | |
| auto_load: Whether to automatically load the model | |
| Returns: | |
| EmotionPredictor instance or None if loading fails | |
| """ | |
| predictor = EmotionPredictor(model_name) | |
| if auto_load: | |
| if not predictor.load(): | |
| return None | |
| return predictor | |
| if __name__ == "__main__": | |
| # Show available models | |
| print("Available models:") | |
| for name, available in EmotionPredictor.get_available_models().items(): | |
| status = "✓" if available else "✗" | |
| print(f" {status} {name}") | |