# enhanced_expression_recognition.py import os import multiprocessing as mp mp.set_start_method("spawn", force=True) import torch import numpy as np from PIL import Image from tqdm import tqdm from torch import nn, optim from torch.utils.data import Dataset, DataLoader, Subset import torchvision.transforms as transforms from collections import Counter torch.set_float32_matmul_precision('high') # ============================================================ # CONFIG # ============================================================ DATASET_DIR = "./data/Expression_data" TRAIN_DIR = os.path.join( DATASET_DIR, "Facial_expression_train" ) TEST_DIR = os.path.join( DATASET_DIR, "Facial_expression_test" ) IMAGE_SIZE = 72 BATCH_SIZE = 64 NUM_EPOCHS = 60 LEARNING_RATE = 0.001 NUM_WORKERS = 0 MODEL_SAVE_PATH = "./models/expression_model.pth" PATIENCE = 12 # ============================================================ # DOWNLOAD DATASET # ============================================================ import urllib.request import zipfile def download_required_files(): dataset_url = ( "https://cdn.talentsprint.com/" "aiml/Experiment_related_data/" "Expression_data.zip" ) os.makedirs("./data", exist_ok=True) os.makedirs("./models", exist_ok=True) dataset_zip = "./data/Expression_data.zip" if not os.path.exists(dataset_zip): print("Downloading dataset...") urllib.request.urlretrieve( dataset_url, dataset_zip ) print("Dataset downloaded") else: print("Dataset zip already exists") if not os.path.exists(DATASET_DIR): print("Extracting dataset...") with zipfile.ZipFile(dataset_zip, 'r') as zip_ref: zip_ref.extractall("./data") print("Dataset extracted") else: print("Dataset already extracted") # ============================================================ # DEVICE SETUP # ============================================================ if torch.backends.mps.is_available(): device = torch.device("mps") print("Using Apple Silicon GPU (MPS)") elif torch.cuda.is_available(): device = torch.device("cuda") print("Using CUDA GPU") else: device = torch.device("cpu") print("Using CPU") # ============================================================ # DATASET # ============================================================ class ExpressionDataset(Dataset): def __init__(self, image_folder, transform=None): self.image_folder = image_folder self.transform = transform self.image_paths = [] self.labels = [] self.class_to_idx = {} self.idx_to_class = {} self.classes = sorted([ d for d in os.listdir(image_folder) if os.path.isdir( os.path.join(image_folder, d) ) ]) for idx, class_name in enumerate(self.classes): self.class_to_idx[class_name] = idx self.idx_to_class[idx] = class_name class_dir = os.path.join( image_folder, class_name ) for file_name in os.listdir(class_dir): if file_name.lower().endswith( (".jpg", ".jpeg", ".png") ): self.image_paths.append( os.path.join( class_dir, file_name ) ) self.labels.append(idx) def __len__(self): return len(self.image_paths) def __getitem__(self, index): image_path = self.image_paths[index] label = self.labels[index] image = Image.open( image_path ).convert("L") if self.transform: image = self.transform(image) return image, label # ============================================================ # TRANSFORMS # ============================================================ train_transforms = transforms.Compose([ transforms.Resize( (IMAGE_SIZE, IMAGE_SIZE) ), transforms.RandomHorizontalFlip( p=0.5 ), transforms.RandomRotation( 2 ), transforms.ToTensor(), transforms.Normalize( mean=[0.5], std=[0.5] ) ]) val_transforms = transforms.Compose([ transforms.Resize( (IMAGE_SIZE, IMAGE_SIZE) ), transforms.ToTensor(), transforms.Normalize( mean=[0.5], std=[0.5] ) ]) # ============================================================ # RESIDUAL BLOCK # ============================================================ class ResidualBlock(nn.Module): def __init__( self, in_channels, out_channels, stride=1 ): super().__init__() self.conv1 = nn.Conv2d( in_channels, out_channels, kernel_size=3, stride=stride, padding=1, bias=False ) self.bn1 = nn.BatchNorm2d( out_channels ) self.relu = nn.ReLU(inplace=True) self.conv2 = nn.Conv2d( out_channels, out_channels, kernel_size=3, padding=1, bias=False ) self.bn2 = nn.BatchNorm2d( out_channels ) self.shortcut = nn.Sequential() if stride != 1 or in_channels != out_channels: self.shortcut = nn.Sequential( nn.Conv2d( in_channels, out_channels, kernel_size=1, stride=stride, bias=False ), nn.BatchNorm2d( out_channels ) ) def forward(self, x): identity = self.shortcut(x) out = self.conv1(x) out = self.bn1(out) out = self.relu(out) out = self.conv2(out) out = self.bn2(out) out += identity out = self.relu(out) return out # ============================================================ # MODEL # ============================================================ class ExpressionCNN(nn.Module): def __init__(self, num_classes=7): super().__init__() self.features = nn.Sequential( nn.Conv2d( 1, 32, kernel_size=3, padding=1 ), nn.BatchNorm2d(32), nn.ReLU(), nn.MaxPool2d(2), ResidualBlock(32, 64), nn.MaxPool2d(2), ResidualBlock(64, 128), nn.MaxPool2d(2), ResidualBlock(128, 256), nn.MaxPool2d(2), ResidualBlock(256, 512), nn.AdaptiveAvgPool2d((1, 1)) ) self.classifier = nn.Sequential( nn.Flatten(), nn.Linear(512, 256), nn.ReLU(), nn.Dropout(0.4), nn.Linear(256, 128), nn.ReLU(), nn.Dropout(0.3), nn.Linear(128, num_classes) ) def forward(self, x): x = self.features(x) x = self.classifier(x) return x # ============================================================ # PREDICTION # ============================================================ def predict_expression( model, image_path, transform, idx_to_class ): image = Image.open( image_path ).convert("L") tensor = transform(image) tensor = tensor.unsqueeze(0).to(device) with torch.no_grad(): outputs = model(tensor) probabilities = torch.softmax( outputs, dim=1 ) confidence, predicted = torch.max( probabilities, 1 ) predicted_class = idx_to_class[ predicted.item() ] return predicted_class, confidence.item() # ============================================================ # MAIN # ============================================================ def main(): download_required_files() print("\nLoading dataset...\n") full_train_dataset = ExpressionDataset( TRAIN_DIR, transform=train_transforms ) full_val_dataset = ExpressionDataset( TRAIN_DIR, transform=val_transforms ) indices = np.arange( len(full_train_dataset) ) np.random.shuffle(indices) val_size = int( 0.1 * len(indices) ) val_indices = indices[:val_size] train_indices = indices[val_size:] train_dataset = Subset( full_train_dataset, train_indices ) val_dataset = Subset( full_val_dataset, val_indices ) print( f"Training images: {len(train_dataset)}" ) print( f"Validation images: {len(val_dataset)}" ) print( f"Classes: " f"{full_train_dataset.classes}" ) # ======================================================== # CLASS WEIGHTS # ======================================================== label_counts = Counter( full_train_dataset.labels ) weights = [] for i in range( len(full_train_dataset.classes) ): weight = np.sqrt( len(full_train_dataset.labels) / label_counts[i] ) weights.append(weight) weights = torch.tensor( weights, dtype=torch.float32 ).to(device) # ======================================================== # DATALOADERS # ======================================================== train_loader = DataLoader( train_dataset, batch_size=BATCH_SIZE, shuffle=True, num_workers=NUM_WORKERS ) val_loader = DataLoader( val_dataset, batch_size=BATCH_SIZE, shuffle=False, num_workers=NUM_WORKERS ) # ======================================================== # MODEL # ======================================================== model = ExpressionCNN( num_classes=len( full_train_dataset.classes ) ).to(device) print( f"\nModel Device: " f"{next(model.parameters()).device}" ) criterion = nn.CrossEntropyLoss( weight=weights, label_smoothing=0.1 ) optimizer = optim.Adam( model.parameters(), lr=LEARNING_RATE ) scheduler = optim.lr_scheduler.ReduceLROnPlateau( optimizer, mode="max", factor=0.5, patience=4 ) # ======================================================== # TRAINING LOOP # ======================================================== best_accuracy = 0.0 epochs_without_improvement = 0 print("\nStarting Training...\n") for epoch in range(NUM_EPOCHS): model.train() correct_train = 0 total_train = 0 train_bar = tqdm(train_loader) for images, labels in train_bar: images = images.to(device) labels = labels.to(device) optimizer.zero_grad() outputs = model(images) loss = criterion( outputs, labels ) loss.backward() optimizer.step() _, predicted = torch.max( outputs, 1 ) total_train += labels.size(0) correct_train += ( predicted == labels ).sum().item() train_bar.set_description( f"Epoch {epoch+1}/{NUM_EPOCHS} " f"Loss: {loss.item():.4f}" ) train_accuracy = ( 100 * correct_train / total_train ) # ==================================================== # VALIDATION # ==================================================== model.eval() correct_val = 0 total_val = 0 with torch.no_grad(): for images, labels in val_loader: images = images.to(device) labels = labels.to(device) outputs = model(images) _, predicted = torch.max( outputs, 1 ) total_val += labels.size(0) correct_val += ( predicted == labels ).sum().item() val_accuracy = ( 100 * correct_val / total_val ) scheduler.step(val_accuracy) print( f"Epoch [{epoch+1}/{NUM_EPOCHS}] " f"Train Accuracy: " f"{train_accuracy:.2f}% | " f"Validation Accuracy: " f"{val_accuracy:.2f}%" ) # ==================================================== # SAVE BEST MODEL # ==================================================== if val_accuracy > best_accuracy: best_accuracy = val_accuracy epochs_without_improvement = 0 torch.save( { "model_state_dict": model.state_dict(), "class_to_idx": full_train_dataset.class_to_idx, "idx_to_class": full_train_dataset.idx_to_class, "accuracy": best_accuracy }, MODEL_SAVE_PATH ) print( f"Best model saved " f"with accuracy: " f"{best_accuracy:.2f}%" ) else: epochs_without_improvement += 1 # ==================================================== # EARLY STOPPING # ==================================================== if epochs_without_improvement >= PATIENCE: print("\nEarly stopping triggered") break print("\nTraining Complete") print( f"Best Validation Accuracy: " f"{best_accuracy:.2f}%" ) if __name__ == "__main__": import multiprocessing multiprocessing.freeze_support() main()