""" Data loading utilities for the Emotion Recognition System. """ import os import numpy as np from pathlib import Path from typing import Tuple, Dict, Optional from collections import Counter import tensorflow as tf from tensorflow.keras.preprocessing.image import ImageDataGenerator import sys sys.path.append(str(Path(__file__).parent.parent.parent)) from src.config import ( TRAIN_DIR, TEST_DIR, IMAGE_SIZE, IMAGE_SIZE_TRANSFER, BATCH_SIZE, VALIDATION_SPLIT, EMOTION_CLASSES, NUM_CLASSES, AUGMENTATION_CONFIG ) def create_data_generators( use_augmentation: bool = True, for_transfer_learning: bool = False, batch_size: int = BATCH_SIZE, validation_split: float = VALIDATION_SPLIT ) -> Tuple[tf.keras.preprocessing.image.DirectoryIterator, tf.keras.preprocessing.image.DirectoryIterator, tf.keras.preprocessing.image.DirectoryIterator]: """ Create data generators for training, validation, and testing. Args: use_augmentation: Whether to apply data augmentation for training for_transfer_learning: If True, resize images for transfer learning models batch_size: Batch size for generators validation_split: Fraction of training data to use for validation Returns: Tuple of (train_generator, val_generator, test_generator) """ target_size = IMAGE_SIZE_TRANSFER if for_transfer_learning else IMAGE_SIZE color_mode = 'rgb' if for_transfer_learning else 'grayscale' # Training data generator with augmentation if use_augmentation: train_datagen = ImageDataGenerator( rescale=1./255, rotation_range=AUGMENTATION_CONFIG["rotation_range"], width_shift_range=AUGMENTATION_CONFIG["width_shift_range"], height_shift_range=AUGMENTATION_CONFIG["height_shift_range"], horizontal_flip=AUGMENTATION_CONFIG["horizontal_flip"], zoom_range=AUGMENTATION_CONFIG["zoom_range"], brightness_range=AUGMENTATION_CONFIG["brightness_range"], fill_mode=AUGMENTATION_CONFIG["fill_mode"], validation_split=validation_split ) else: train_datagen = ImageDataGenerator( rescale=1./255, validation_split=validation_split ) # Test data generator (no augmentation) test_datagen = ImageDataGenerator(rescale=1./255) # Create generators train_generator = train_datagen.flow_from_directory( str(TRAIN_DIR), target_size=target_size, color_mode=color_mode, batch_size=batch_size, class_mode='categorical', classes=EMOTION_CLASSES, subset='training', shuffle=True ) val_generator = train_datagen.flow_from_directory( str(TRAIN_DIR), target_size=target_size, color_mode=color_mode, batch_size=batch_size, class_mode='categorical', classes=EMOTION_CLASSES, subset='validation', shuffle=False ) test_generator = test_datagen.flow_from_directory( str(TEST_DIR), target_size=target_size, color_mode=color_mode, batch_size=batch_size, class_mode='categorical', classes=EMOTION_CLASSES, shuffle=False ) return train_generator, val_generator, test_generator def load_dataset( for_transfer_learning: bool = False ) -> Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]: """ Load the entire dataset into memory as numpy arrays. Args: for_transfer_learning: If True, resize images for transfer learning models Returns: Tuple of (X_train, y_train, X_test, y_test) """ target_size = IMAGE_SIZE_TRANSFER if for_transfer_learning else IMAGE_SIZE color_mode = 'rgb' if for_transfer_learning else 'grayscale' datagen = ImageDataGenerator(rescale=1./255) # Load training data train_gen = datagen.flow_from_directory( str(TRAIN_DIR), target_size=target_size, color_mode=color_mode, batch_size=1, class_mode='categorical', classes=EMOTION_CLASSES, shuffle=False ) # Load test data test_gen = datagen.flow_from_directory( str(TEST_DIR), target_size=target_size, color_mode=color_mode, batch_size=1, class_mode='categorical', classes=EMOTION_CLASSES, shuffle=False ) # Extract all data X_train = np.concatenate([train_gen[i][0] for i in range(len(train_gen))]) y_train = np.concatenate([train_gen[i][1] for i in range(len(train_gen))]) X_test = np.concatenate([test_gen[i][0] for i in range(len(test_gen))]) y_test = np.concatenate([test_gen[i][1] for i in range(len(test_gen))]) return X_train, y_train, X_test, y_test def get_class_weights(train_generator) -> Dict[int, float]: """ Calculate class weights to handle class imbalance. Args: train_generator: Training data generator Returns: Dictionary mapping class indices to weights """ # Get class distribution class_counts = Counter(train_generator.classes) total_samples = sum(class_counts.values()) num_classes = len(class_counts) # Calculate weights (inverse frequency) class_weights = {} for class_idx, count in class_counts.items(): class_weights[class_idx] = total_samples / (num_classes * count) return class_weights def get_dataset_info() -> Dict: """ Get information about the dataset. Returns: Dictionary with dataset statistics """ info = { "train": {}, "test": {}, "emotion_classes": EMOTION_CLASSES, "num_classes": NUM_CLASSES } # Count training samples per class for emotion in EMOTION_CLASSES: train_path = TRAIN_DIR / emotion test_path = TEST_DIR / emotion if train_path.exists(): info["train"][emotion] = len(list(train_path.glob("*.png"))) + len(list(train_path.glob("*.jpg"))) else: info["train"][emotion] = 0 if test_path.exists(): info["test"][emotion] = len(list(test_path.glob("*.png"))) + len(list(test_path.glob("*.jpg"))) else: info["test"][emotion] = 0 info["total_train"] = sum(info["train"].values()) info["total_test"] = sum(info["test"].values()) return info if __name__ == "__main__": # Test data loading print("Dataset Information:") info = get_dataset_info() print(f"Total training samples: {info['total_train']}") print(f"Total test samples: {info['total_test']}") print("\nSamples per class (training):") for emotion, count in info["train"].items(): print(f" {emotion}: {count}")