""" FER-2013 Dataset Training Script — PyTorch Implementation Trains a CNN for facial emotion recognition. Usage: python train_face_model.py """ import os import sys import numpy as np import torch import torch.nn as nn import torch.optim as optim from torch.utils.data import DataLoader, TensorDataset import torch.nn.functional as F print("=" * 60) print(" THE SENTINEL INTERFACE — Face Emotion Model Trainer") print(f" PyTorch {torch.__version__} | Device: CPU") print("=" * 60) sys.path.insert(0, os.path.dirname(__file__)) from models.face_model import build_cnn_model, EMOTION_LABELS def generate_synthetic_dataset(dataset_dir): """Generate synthetic dataset when FER-2013 is unavailable.""" import cv2 emotions = ["angry", "disgust", "fear", "happy", "sad", "surprise", "neutral"] train_dir = os.path.join(dataset_dir, "train") test_dir = os.path.join(dataset_dir, "test") emotion_patterns = { "angry": {"bias": -30, "contrast": 1.5}, "disgust": {"bias": -20, "contrast": 1.3}, "fear": {"bias": 10, "contrast": 1.6}, "happy": {"bias": 30, "contrast": 1.2}, "sad": {"bias": -10, "contrast": 0.8}, "surprise": {"bias": 20, "contrast": 1.8}, "neutral": {"bias": 0, "contrast": 1.0}, } for split, split_dir, n_samples in [("train", train_dir, 500), ("test", test_dir, 100)]: for emotion in emotions: emo_dir = os.path.join(split_dir, emotion) os.makedirs(emo_dir, exist_ok=True) p = emotion_patterns[emotion] for i in range(n_samples): img = np.random.randint(80, 180, (48, 48), dtype=np.uint8) img = np.clip(img.astype(np.float32) * p["contrast"] + p["bias"], 0, 255).astype(np.uint8) cv2.ellipse(img, (24, 24), (16, 20), 0, 0, 360, 200, 1) cv2.circle(img, (18, 20), 2, 220, -1) cv2.circle(img, (30, 20), 2, 220, -1) if emotion == "happy": cv2.ellipse(img, (24, 32), (8, 4), 0, 0, 180, 220, 1) elif emotion in ("sad", "fear"): cv2.ellipse(img, (24, 35), (8, 4), 0, 180, 360, 220, 1) elif emotion == "surprise": cv2.circle(img, (24, 33), 4, 220, 1) elif emotion == "angry": cv2.line(img, (18, 30), (30, 30), 220, 1) else: cv2.line(img, (18, 32), (30, 32), 220, 1) cv2.imwrite(os.path.join(emo_dir, f"{emotion}_{i:04d}.png"), img) print(f"[Dataset] Generated {split}: {n_samples} x {len(emotions)} = {n_samples * len(emotions)} images") return train_dir, test_dir def download_fer2013(): """Try to download FER-2013, fall back to synthetic.""" dataset_dir = os.path.join(os.path.dirname(__file__), "dataset") os.makedirs(dataset_dir, exist_ok=True) csv_path = os.path.join(dataset_dir, "fer2013.csv") # Check for existing directories train_dir = os.path.join(dataset_dir, "train") test_dir = os.path.join(dataset_dir, "test") if os.path.exists(train_dir) and os.path.exists(test_dir): print(f"[Dataset] Found existing dataset at {dataset_dir}") return train_dir, test_dir # Check for CSV if os.path.exists(csv_path): print(f"[Dataset] Found CSV at {csv_path}") return csv_path, None # NOTE: To use real FER-2013, download from Kaggle and place in dataset/train/ and dataset/test/ # Skipping kagglehub auto-download (requires Kaggle API credentials) print("[Dataset] No existing dataset found. Generating synthetic training data...") return generate_synthetic_dataset(dataset_dir) def load_from_dirs(train_dir, test_dir): """Load images from class-structured directories.""" import cv2 emotions = ["angry", "disgust", "fear", "happy", "sad", "surprise", "neutral"] def load_split(split_dir): imgs, labels = [], [] for idx, emo in enumerate(emotions): emo_path = os.path.join(split_dir, emo) if not os.path.exists(emo_path): continue files = [f for f in os.listdir(emo_path) if f.endswith(('.png', '.jpg', '.jpeg'))] for fname in files: img = cv2.imread(os.path.join(emo_path, fname), cv2.IMREAD_GRAYSCALE) if img is not None: img = cv2.resize(img, (48, 48)) imgs.append(img.astype(np.float32) / 255.0) labels.append(idx) print(f" [{emo}] {len(files)} images") return np.array(imgs).reshape(-1, 1, 48, 48), np.array(labels) print("[Dataset] Loading training set...") X_train, y_train = load_split(train_dir) if test_dir and os.path.exists(test_dir): print("[Dataset] Loading test set...") X_test, y_test = load_split(test_dir) else: from sklearn.model_selection import train_test_split X_train, X_test, y_train, y_test = train_test_split(X_train, y_train, test_size=0.2, random_state=42) return X_train, X_test, y_train, y_test def load_from_csv(csv_path): """Load FER-2013 CSV.""" import pandas as pd print("[Dataset] Loading CSV...") df = pd.read_csv(csv_path) pixels = df["pixels"].apply(lambda x: np.fromstring(x, sep=" ").reshape(1, 48, 48).astype(np.float32) / 255.0) X = np.stack(pixels.values) y = df["emotion"].values if "Usage" in df.columns: train_mask = df["Usage"] == "Training" return X[train_mask], X[~train_mask], y[train_mask], y[~train_mask] from sklearn.model_selection import train_test_split return train_test_split(X, y, test_size=0.2, random_state=42) def train(): result = download_fer2013() if isinstance(result, tuple) and result[1] is not None and os.path.isdir(str(result[0])): X_train, X_test, y_train, y_test = load_from_dirs(result[0], result[1]) elif isinstance(result, tuple) and isinstance(result[0], str) and result[0].endswith(".csv"): X_train, X_test, y_train, y_test = load_from_csv(result[0]) else: X_train, X_test, y_train, y_test = load_from_dirs(result[0], result[1]) print(f"\n[Data] Train: {len(X_train)} | Test: {len(X_test)} | Shape: {X_train[0].shape}") # Create DataLoaders train_ds = TensorDataset(torch.tensor(X_train, dtype=torch.float32), torch.tensor(y_train, dtype=torch.long)) test_ds = TensorDataset(torch.tensor(X_test, dtype=torch.float32), torch.tensor(y_test, dtype=torch.long)) train_loader = DataLoader(train_ds, batch_size=64, shuffle=True) test_loader = DataLoader(test_ds, batch_size=64, shuffle=False) # Build model model = build_cnn_model() criterion = nn.CrossEntropyLoss() optimizer = optim.Adam(model.parameters(), lr=0.0001) scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, patience=5, factor=0.5) save_path = os.path.join(os.path.dirname(__file__), "saved_models", "face_emotion_cnn.pth") os.makedirs(os.path.dirname(save_path), exist_ok=True) best_acc = 0 epochs = 30 print(f"\n[Training] {epochs} epochs, batch_size=64") for epoch in range(1, epochs + 1): # Train model.train() total_loss = 0 correct = 0 total = 0 for X_batch, y_batch in train_loader: optimizer.zero_grad() outputs = model(X_batch) loss = criterion(outputs, y_batch) loss.backward() optimizer.step() total_loss += loss.item() _, predicted = outputs.max(1) total += y_batch.size(0) correct += predicted.eq(y_batch).sum().item() train_acc = correct / total # Evaluate model.eval() test_correct = 0 test_total = 0 test_loss = 0 with torch.no_grad(): for X_batch, y_batch in test_loader: outputs = model(X_batch) loss = criterion(outputs, y_batch) test_loss += loss.item() _, predicted = outputs.max(1) test_total += y_batch.size(0) test_correct += predicted.eq(y_batch).sum().item() test_acc = test_correct / test_total scheduler.step(test_loss) print(f" Epoch {epoch:2d}/{epochs}: Train Acc={train_acc:.4f} | Test Acc={test_acc:.4f} | Loss={total_loss/len(train_loader):.4f}") if test_acc > best_acc: best_acc = test_acc torch.save(model.state_dict(), save_path) print(f" → Saved best model (acc={best_acc:.4f})") print(f"\n[Done] Best test accuracy: {best_acc:.4f}") print(f"[Done] Model saved to: {save_path}") if __name__ == "__main__": train()