| """ |
| FER-2013 Dataset Training Script — PyTorch Implementation |
| Trains a CNN for facial emotion recognition. |
| Usage: python train_face_model.py |
| """ |
| import os |
| import sys |
| import numpy as np |
| import torch |
| import torch.nn as nn |
| import torch.optim as optim |
| from torch.utils.data import DataLoader, TensorDataset |
| import torch.nn.functional as F |
|
|
| print("=" * 60) |
| print(" THE SENTINEL INTERFACE — Face Emotion Model Trainer") |
| print(f" PyTorch {torch.__version__} | Device: CPU") |
| print("=" * 60) |
|
|
| sys.path.insert(0, os.path.dirname(__file__)) |
| from models.face_model import build_cnn_model, EMOTION_LABELS |
|
|
|
|
| def generate_synthetic_dataset(dataset_dir): |
| """Generate synthetic dataset when FER-2013 is unavailable.""" |
| import cv2 |
|
|
| emotions = ["angry", "disgust", "fear", "happy", "sad", "surprise", "neutral"] |
| train_dir = os.path.join(dataset_dir, "train") |
| test_dir = os.path.join(dataset_dir, "test") |
|
|
| emotion_patterns = { |
| "angry": {"bias": -30, "contrast": 1.5}, |
| "disgust": {"bias": -20, "contrast": 1.3}, |
| "fear": {"bias": 10, "contrast": 1.6}, |
| "happy": {"bias": 30, "contrast": 1.2}, |
| "sad": {"bias": -10, "contrast": 0.8}, |
| "surprise": {"bias": 20, "contrast": 1.8}, |
| "neutral": {"bias": 0, "contrast": 1.0}, |
| } |
|
|
| for split, split_dir, n_samples in [("train", train_dir, 500), ("test", test_dir, 100)]: |
| for emotion in emotions: |
| emo_dir = os.path.join(split_dir, emotion) |
| os.makedirs(emo_dir, exist_ok=True) |
| p = emotion_patterns[emotion] |
| for i in range(n_samples): |
| img = np.random.randint(80, 180, (48, 48), dtype=np.uint8) |
| img = np.clip(img.astype(np.float32) * p["contrast"] + p["bias"], 0, 255).astype(np.uint8) |
| cv2.ellipse(img, (24, 24), (16, 20), 0, 0, 360, 200, 1) |
| cv2.circle(img, (18, 20), 2, 220, -1) |
| cv2.circle(img, (30, 20), 2, 220, -1) |
| if emotion == "happy": |
| cv2.ellipse(img, (24, 32), (8, 4), 0, 0, 180, 220, 1) |
| elif emotion in ("sad", "fear"): |
| cv2.ellipse(img, (24, 35), (8, 4), 0, 180, 360, 220, 1) |
| elif emotion == "surprise": |
| cv2.circle(img, (24, 33), 4, 220, 1) |
| elif emotion == "angry": |
| cv2.line(img, (18, 30), (30, 30), 220, 1) |
| else: |
| cv2.line(img, (18, 32), (30, 32), 220, 1) |
| cv2.imwrite(os.path.join(emo_dir, f"{emotion}_{i:04d}.png"), img) |
| print(f"[Dataset] Generated {split}: {n_samples} x {len(emotions)} = {n_samples * len(emotions)} images") |
|
|
| return train_dir, test_dir |
|
|
|
|
| def download_fer2013(): |
| """Try to download FER-2013, fall back to synthetic.""" |
| dataset_dir = os.path.join(os.path.dirname(__file__), "dataset") |
| os.makedirs(dataset_dir, exist_ok=True) |
| csv_path = os.path.join(dataset_dir, "fer2013.csv") |
|
|
| |
| train_dir = os.path.join(dataset_dir, "train") |
| test_dir = os.path.join(dataset_dir, "test") |
| if os.path.exists(train_dir) and os.path.exists(test_dir): |
| print(f"[Dataset] Found existing dataset at {dataset_dir}") |
| return train_dir, test_dir |
|
|
| |
| if os.path.exists(csv_path): |
| print(f"[Dataset] Found CSV at {csv_path}") |
| return csv_path, None |
|
|
| |
| |
| print("[Dataset] No existing dataset found. Generating synthetic training data...") |
| return generate_synthetic_dataset(dataset_dir) |
|
|
|
|
| def load_from_dirs(train_dir, test_dir): |
| """Load images from class-structured directories.""" |
| import cv2 |
| emotions = ["angry", "disgust", "fear", "happy", "sad", "surprise", "neutral"] |
|
|
| def load_split(split_dir): |
| imgs, labels = [], [] |
| for idx, emo in enumerate(emotions): |
| emo_path = os.path.join(split_dir, emo) |
| if not os.path.exists(emo_path): |
| continue |
| files = [f for f in os.listdir(emo_path) if f.endswith(('.png', '.jpg', '.jpeg'))] |
| for fname in files: |
| img = cv2.imread(os.path.join(emo_path, fname), cv2.IMREAD_GRAYSCALE) |
| if img is not None: |
| img = cv2.resize(img, (48, 48)) |
| imgs.append(img.astype(np.float32) / 255.0) |
| labels.append(idx) |
| print(f" [{emo}] {len(files)} images") |
| return np.array(imgs).reshape(-1, 1, 48, 48), np.array(labels) |
|
|
| print("[Dataset] Loading training set...") |
| X_train, y_train = load_split(train_dir) |
| if test_dir and os.path.exists(test_dir): |
| print("[Dataset] Loading test set...") |
| X_test, y_test = load_split(test_dir) |
| else: |
| from sklearn.model_selection import train_test_split |
| X_train, X_test, y_train, y_test = train_test_split(X_train, y_train, test_size=0.2, random_state=42) |
| return X_train, X_test, y_train, y_test |
|
|
|
|
| def load_from_csv(csv_path): |
| """Load FER-2013 CSV.""" |
| import pandas as pd |
| print("[Dataset] Loading CSV...") |
| df = pd.read_csv(csv_path) |
| pixels = df["pixels"].apply(lambda x: np.fromstring(x, sep=" ").reshape(1, 48, 48).astype(np.float32) / 255.0) |
| X = np.stack(pixels.values) |
| y = df["emotion"].values |
| if "Usage" in df.columns: |
| train_mask = df["Usage"] == "Training" |
| return X[train_mask], X[~train_mask], y[train_mask], y[~train_mask] |
| from sklearn.model_selection import train_test_split |
| return train_test_split(X, y, test_size=0.2, random_state=42) |
|
|
|
|
| def train(): |
| result = download_fer2013() |
|
|
| if isinstance(result, tuple) and result[1] is not None and os.path.isdir(str(result[0])): |
| X_train, X_test, y_train, y_test = load_from_dirs(result[0], result[1]) |
| elif isinstance(result, tuple) and isinstance(result[0], str) and result[0].endswith(".csv"): |
| X_train, X_test, y_train, y_test = load_from_csv(result[0]) |
| else: |
| X_train, X_test, y_train, y_test = load_from_dirs(result[0], result[1]) |
|
|
| print(f"\n[Data] Train: {len(X_train)} | Test: {len(X_test)} | Shape: {X_train[0].shape}") |
|
|
| |
| train_ds = TensorDataset(torch.tensor(X_train, dtype=torch.float32), |
| torch.tensor(y_train, dtype=torch.long)) |
| test_ds = TensorDataset(torch.tensor(X_test, dtype=torch.float32), |
| torch.tensor(y_test, dtype=torch.long)) |
| train_loader = DataLoader(train_ds, batch_size=64, shuffle=True) |
| test_loader = DataLoader(test_ds, batch_size=64, shuffle=False) |
|
|
| |
| model = build_cnn_model() |
| criterion = nn.CrossEntropyLoss() |
| optimizer = optim.Adam(model.parameters(), lr=0.0001) |
| scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, patience=5, factor=0.5) |
|
|
| save_path = os.path.join(os.path.dirname(__file__), "saved_models", "face_emotion_cnn.pth") |
| os.makedirs(os.path.dirname(save_path), exist_ok=True) |
|
|
| best_acc = 0 |
| epochs = 30 |
| print(f"\n[Training] {epochs} epochs, batch_size=64") |
|
|
| for epoch in range(1, epochs + 1): |
| |
| model.train() |
| total_loss = 0 |
| correct = 0 |
| total = 0 |
| for X_batch, y_batch in train_loader: |
| optimizer.zero_grad() |
| outputs = model(X_batch) |
| loss = criterion(outputs, y_batch) |
| loss.backward() |
| optimizer.step() |
| total_loss += loss.item() |
| _, predicted = outputs.max(1) |
| total += y_batch.size(0) |
| correct += predicted.eq(y_batch).sum().item() |
| train_acc = correct / total |
|
|
| |
| model.eval() |
| test_correct = 0 |
| test_total = 0 |
| test_loss = 0 |
| with torch.no_grad(): |
| for X_batch, y_batch in test_loader: |
| outputs = model(X_batch) |
| loss = criterion(outputs, y_batch) |
| test_loss += loss.item() |
| _, predicted = outputs.max(1) |
| test_total += y_batch.size(0) |
| test_correct += predicted.eq(y_batch).sum().item() |
| test_acc = test_correct / test_total |
|
|
| scheduler.step(test_loss) |
|
|
| print(f" Epoch {epoch:2d}/{epochs}: Train Acc={train_acc:.4f} | Test Acc={test_acc:.4f} | Loss={total_loss/len(train_loader):.4f}") |
|
|
| if test_acc > best_acc: |
| best_acc = test_acc |
| torch.save(model.state_dict(), save_path) |
| print(f" → Saved best model (acc={best_acc:.4f})") |
|
|
| print(f"\n[Done] Best test accuracy: {best_acc:.4f}") |
| print(f"[Done] Model saved to: {save_path}") |
|
|
|
|
| if __name__ == "__main__": |
| train() |
|
|