import gradio as gr import torch import torch.nn as nn import torch.nn.functional as F import torchvision.transforms as transforms import numpy as np from PIL import Image import cv2 import mediapipe as mp import os import requests from efficientnet_pytorch import EfficientNet # Define paths and URLs MODEL_WEIGHTS_URL = "https://huggingface.co/Sakibrumu/Quad_Stream_Face_Emotion_Classifier/resolve/main/quad_stream_model_rafdb.pth" MODEL_WEIGHTS_PATH = "best_model.pth" # Download model weights from Hugging Face Model Hub def download_model_weights(): if not os.path.exists(MODEL_WEIGHTS_PATH): print(f"Downloading model weights from {MODEL_WEIGHTS_URL}...") try: response = requests.get(MODEL_WEIGHTS_URL, stream=True, timeout=30) response.raise_for_status() with open(MODEL_WEIGHTS_PATH, "wb") as f: for chunk in response.iter_content(chunk_size=8192): if chunk: f.write(chunk) print("Model weights downloaded successfully.") except Exception as e: print(f"Failed to download model weights: {e}") raise RuntimeError("Model weights download failed.") else: print("Model weights already exist locally.") download_model_weights() # Initialize MediaPipe Face Mesh mp_face_mesh = mp.solutions.face_mesh face_mesh = mp_face_mesh.FaceMesh( max_num_faces=1, refine_landmarks=True, min_detection_confidence=0.5, min_tracking_confidence=0.5 ) # Class mapping for RAF-DB class_mapping = { 0: "Surprise", 1: "Fear", 2: "Disgust", 3: "Happiness", 4: "Sadness", 5: "Anger", 6: "Neutral" } # Transform for input images transform = transforms.Compose([ transforms.Resize((224, 224)), transforms.ToTensor(), transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]) ]) # Function to extract landmark features using MediaPipe def extract_landmark_features(image): image_np = np.array(image) h, w = image_np.shape[:2] image_rgb = cv2.cvtColor(image_np, cv2.COLOR_RGB2BGR) results = face_mesh.process(image_rgb) if not results.multi_face_landmarks: return np.zeros(14, dtype=np.float32) landmarks = results.multi_face_landmarks[0].landmark # Map MediaPipe landmarks to approximate dlib indices key_points = { 'left_eye': (landmarks[159].x * w, landmarks[159].y * h), 'right_eye': (landmarks[386].x * w, landmarks[386].y * h), 'nose_tip': (landmarks[1].x * w, landmarks[1].y * h), 'mouth_left': (landmarks[61].x * w, landmarks[61].y * h), 'mouth_right': (landmarks[291].x * w, landmarks[291].y * h), 'left_eyebrow': (landmarks[70].x * w, landmarks[70].y * h), 'right_eyebrow': (landmarks[300].x * w, landmarks[300].y * h), 'jaw_left': (landmarks[172].x * w, landmarks[172].y * h), 'jaw_right': (landmarks[397].x * w, landmarks[397].y * h), 'chin': (landmarks[152].x * w, landmarks[152].y * h), 'left_lower_eyelid': (landmarks[145].x * w, landmarks[145].y * h), 'right_lower_eyelid': (landmarks[374].x * w, landmarks[374].y * h), 'left_cheek': (landmarks[137].x * w, landmarks[137].y * h), 'right_cheek': (landmarks[366].x * w, landmarks[366].y * h) } features = [] eye_dist = np.sqrt((key_points['left_eye'][0] - key_points['right_eye'][0])**2 + (key_points['left_eye'][1] - key_points['right_eye'][1])**2) features.append(eye_dist) mouth_width = np.sqrt((key_points['mouth_left'][0] - key_points['mouth_right'][0])**2 + (key_points['mouth_left'][1] - key_points['mouth_right'][1])**2) features.append(mouth_width) nose_to_mouth_left = np.sqrt((key_points['nose_tip'][0] - key_points['mouth_left'][0])**2 + (key_points['nose_tip'][1] - key_points['mouth_left'][1])**2) nose_to_mouth_right = np.sqrt((key_points['nose_tip'][0] - key_points['mouth_right'][0])**2 + (key_points['nose_tip'][1] - key_points['mouth_right'][1])**2) features.extend([nose_to_mouth_left, nose_to_mouth_right]) left_eye_to_nose = np.sqrt((key_points['left_eye'][0] - key_points['nose_tip'][0])**2 + (key_points['left_eye'][1] - key_points['nose_tip'][1])**2) right_eye_to_nose = np.sqrt((key_points['right_eye'][0] - key_points['nose_tip'][0])**2 + (key_points['right_eye'][1] - key_points['nose_tip'][1])**2) features.extend([left_eye_to_nose, right_eye_to_nose]) vec1 = np.array([key_points['left_eye'][0] - key_points['nose_tip'][0], key_points['left_eye'][1] - key_points['nose_tip'][1]]) vec2 = np.array([key_points['right_eye'][0] - key_points['nose_tip'][0], key_points['right_eye'][1] - key_points['nose_tip'][1]]) cos_angle = np.dot(vec1, vec2) / (np.linalg.norm(vec1) * np.linalg.norm(vec2) + 1e-8) angle = np.arccos(np.clip(cos_angle, -1.0, 1.0)) features.append(angle) mouth_center = ((key_points['mouth_left'][0] + key_points['mouth_right'][0]) / 2, (key_points['mouth_left'][1] + key_points['mouth_right'][1]) / 2) mouth_to_left_eye = np.sqrt((mouth_center[0] - key_points['left_eye'][0])**2 + (mouth_center[1] - key_points['left_eye'][1])**2) mouth_to_right_eye = np.sqrt((mouth_center[0] - key_points['right_eye'][0])**2 + (mouth_center[1] - key_points['right_eye'][1])**2) features.extend([mouth_to_left_eye, mouth_to_right_eye]) mouth_aspect_ratio = mouth_width / (nose_to_mouth_left + nose_to_mouth_right + 1e-8) features.append(mouth_aspect_ratio) left_eyebrow_to_eye = np.sqrt((key_points['left_eyebrow'][0] - key_points['left_eye'][0])**2 + (key_points['left_eyebrow'][1] - key_points['left_eye'][1])**2) right_eyebrow_to_eye = np.sqrt((key_points['right_eyebrow'][0] - key_points['right_eye'][0])**2 + (key_points['right_eyebrow'][1] - key_points['right_eye'][1])**2) features.extend([left_eyebrow_to_eye, right_eyebrow_to_eye]) left_au6 = np.sqrt((key_points['left_lower_eyelid'][0] - key_points['left_cheek'][0])**2 + (key_points['left_lower_eyelid'][1] - key_points['left_cheek'][1])**2) right_au6 = np.sqrt((key_points['right_lower_eyelid'][0] - key_points['right_cheek'][0])**2 + (key_points['right_lower_eyelid'][1] - key_points['right_cheek'][1])**2) avg_au6 = (left_au6 + right_au6) / 2 features.append(avg_au6) mouth_left_to_chin = np.sqrt((key_points['mouth_left'][0] - key_points['chin'][0])**2 + (key_points['mouth_left'][1] - key_points['chin'][1])**2) mouth_right_to_chin = np.sqrt((key_points['mouth_right'][0] - key_points['chin'][0])**2 + (key_points['mouth_right'][1] - key_points['chin'][1])**2) avg_au12 = (mouth_left_to_chin + mouth_right_to_chin) / (2 * (mouth_width + 1e-8)) features.append(avg_au12) return np.array(features, dtype=np.float32) # Function to get landmark mask using MediaPipe def get_landmark_mask(image, target_size=(7, 7)): image_np = np.array(image) h, w = image_np.shape[:2] image_rgb = cv2.cvtColor(image_np, cv2.COLOR_RGB2BGR) results = face_mesh.process(image_rgb) if not results.multi_face_landmarks: return np.ones(target_size, dtype=np.float32) landmarks = results.multi_face_landmarks[0].landmark mask = np.zeros((h, w), dtype=np.float32) key_indices = [ 159, 386, # Eyes 145, 374, # Lower eyelids 61, 291, 80, 310, # Mouth 70, 300, # Eyebrows 172, 397, 152, # Jaw/Chin 137, 366 # Cheeks ] key_points = [(landmarks[i].x * w, landmarks[i].y * h) for i in key_indices] for i, (x, y) in enumerate(key_points): radius = 30 if i in [4, 5, 6, 7, 12, 13] else 20 cv2.circle(mask, (int(x), int(y)), radius, 1.0, -1) mask = cv2.resize(mask, target_size, interpolation=cv2.INTER_LINEAR) mask = np.clip(mask, 0, 1) return mask # Model definitions (unchanged) class EfficientNetBackbone(nn.Module): def __init__(self): super(EfficientNetBackbone, self).__init__() self.efficientnet = EfficientNet.from_pretrained('efficientnet-b4') self.efficientnet._conv_stem = nn.Conv2d(3, 48, kernel_size=3, stride=2, padding=1, bias=False) self.channel_reducer = nn.Conv2d(1792, 256, kernel_size=1, stride=1, padding=0, bias=False) self.bn = nn.BatchNorm2d(256) nn.init.xavier_uniform_(self.channel_reducer.weight) def forward(self, x): x = self.efficientnet.extract_features(x) x = self.channel_reducer(x) x = self.bn(x) return x class HLA(nn.Module): def __init__(self, in_channels=256, reduction=4): super(HLA, self).__init__() reduced_channels = in_channels // reduction self.spatial_branch1 = nn.Conv2d(in_channels, reduced_channels, 1) self.spatial_branch2 = nn.Conv2d(in_channels, reduced_channels, 1) self.sigmoid = nn.Sigmoid() self.channel_restore = nn.Conv2d(reduced_channels, in_channels, 1) self.channel_attention = nn.Sequential( nn.AdaptiveAvgPool2d(1), nn.Conv2d(in_channels, in_channels // reduction, 1, bias=False), nn.ReLU(), nn.Conv2d(in_channels // reduction, in_channels, 1, bias=False), nn.Sigmoid() ) self.bn = nn.BatchNorm2d(in_channels, eps=1e-5) self.dropout = nn.Dropout2d(0.2) def forward(self, x, landmark_mask=None): b1 = self.spatial_branch1(x) b2 = self.spatial_branch2(x) spatial_attn = self.sigmoid(torch.max(b1, b2)) spatial_attn = self.channel_restore(spatial_attn) if landmark_mask is not None: landmark_mask = torch.tensor(landmark_mask, dtype=x.dtype) landmark_mask = landmark_mask.view(-1, 1, 7, 7) spatial_attn = spatial_attn * landmark_mask spatial_attn = self.dropout(spatial_attn) spatial_out = x * spatial_attn channel_attn = self.channel_attention(spatial_out) channel_attn = self.dropout(channel_attn) out = spatial_out * channel_attn out = self.bn(out) return out class ViT(nn.Module): def __init__(self, in_channels=256, patch_size=1, embed_dim=768, num_layers=8, num_heads=12): super(ViT, self).__init__() self.patch_embed = nn.Conv2d(in_channels, embed_dim, kernel_size=patch_size, stride=patch_size) self.cls_token = nn.Parameter(torch.zeros(1, 1, embed_dim)) num_patches = (7 // patch_size) * (7 // patch_size) self.pos_embed = nn.Parameter(torch.zeros(1, num_patches + 1, embed_dim)) self.transformer = nn.ModuleList([ nn.TransformerEncoderLayer(embed_dim, num_heads, dim_feedforward=1536, activation="gelu") for _ in range(num_layers) ]) self.ln = nn.LayerNorm(embed_dim) self.bn = nn.BatchNorm1d(embed_dim, eps=1e-5) nn.init.xavier_uniform_(self.patch_embed.weight) nn.init.zeros_(self.patch_embed.bias) nn.init.normal_(self.cls_token, std=0.02) nn.init.normal_(self.pos_embed, std=0.02) def forward(self, x): x = self.patch_embed(x) x = x.flatten(2).transpose(1, 2) cls_tokens = self.cls_token.expand(x.size(0), -1, -1) x = torch.cat([cls_tokens, x], dim=1) x = x + self.pos_embed for layer in self.transformer: x = layer(x) x = x[:, 0] x = self.ln(x) x = self.bn(x) return x class IntensityStream(nn.Module): def __init__(self, in_channels=256): super(IntensityStream, self).__init__() sobel_x = torch.tensor([[-1, 0, 1], [-2, 0, 2], [-1, 0, 1]], dtype=torch.float32) sobel_y = torch.tensor([[-1, -2, -1], [0, 0, 0], [1, 2, 1]], dtype=torch.float32) self.sobel_x = nn.Conv2d(in_channels, in_channels, 3, padding=1, bias=False, groups=in_channels) self.sobel_y = nn.Conv2d(in_channels, in_channels, 3, padding=1, bias=False, groups=in_channels) self.sobel_x.weight.data = sobel_x.repeat(in_channels, 1, 1, 1) self.sobel_y.weight.data = sobel_y.repeat(in_channels, 1, 1, 1) self.conv = nn.Conv2d(in_channels, 128, 3, padding=1) self.bn = nn.BatchNorm2d(128, eps=1e-5) self.pool = nn.AdaptiveAvgPool2d(1) self.attention = nn.MultiheadAttention(embed_dim=128, num_heads=1) nn.init.xavier_uniform_(self.conv.weight) nn.init.zeros_(self.conv.bias) def forward(self, x): gx = self.sobel_x(x) gy = self.sobel_y(x) grad_magnitude = torch.sqrt(gx**2 + gy**2 + 1e-8) variance = ((x - x.mean(dim=1, keepdim=True))**2).mean(dim=1).flatten(1) cnn_out = F.relu(self.conv(grad_magnitude)) cnn_out = self.bn(cnn_out) texture_out = self.pool(cnn_out).squeeze(-1).squeeze(-1) attn_in = cnn_out.flatten(2).permute(2, 0, 1) attn_in = attn_in / (attn_in.norm(dim=-1, keepdim=True) + 1e-8) attn_out, _ = self.attention(attn_in, attn_in, attn_in) context_out = attn_out.mean(dim=0) out = torch.cat([texture_out, context_out], dim=1) return out, grad_magnitude, variance class LandmarkStream(nn.Module): def __init__(self, input_dim=14, embed_dim=768): super(LandmarkStream, self).__init__() self.fc1 = nn.Linear(input_dim, 128) self.fc2 = nn.Linear(128, 256) self.fc3 = nn.Linear(256, embed_dim) self.bn1 = nn.BatchNorm1d(128) self.bn2 = nn.BatchNorm1d(256) self.bn3 = nn.BatchNorm1d(embed_dim) self.dropout = nn.Dropout(0.4) nn.init.xavier_uniform_(self.fc1.weight) nn.init.zeros_(self.fc1.bias) nn.init.xavier_uniform_(self.fc2.weight) nn.init.zeros_(self.fc2.bias) nn.init.xavier_uniform_(self.fc3.weight) nn.init.zeros_(self.fc3.bias) def forward(self, x): x = F.relu(self.bn1(self.fc1(x))) x = self.dropout(x) x = F.relu(self.bn2(self.fc2(x))) x = self.dropout(x) x = self.bn3(self.fc3(x)) return x class QuadStreamHLAViT(nn.Module): def __init__(self, num_classes=7): super(QuadStreamHLAViT, self).__init__() self.backbone = EfficientNetBackbone() self.hla = HLA() self.vit = ViT() self.intensity = IntensityStream() self.landmark = LandmarkStream(input_dim=14, embed_dim=768) self.fc_hla = nn.Linear(256*7*7, 768) self.fc_intensity = nn.Linear(256, 768) self.fusion_fc = nn.Linear(768*4, 512) self.bn_fusion = nn.BatchNorm1d(512, eps=1e-5) self.dropout = nn.Dropout(0.6) self.classifier = nn.Linear(512, num_classes) nn.init.xavier_uniform_(self.fc_hla.weight) nn.init.zeros_(self.fc_hla.bias) nn.init.xavier_uniform_(self.fc_intensity.weight) nn.init.zeros_(self.fc_intensity.bias) nn.init.xavier_uniform_(self.fusion_fc.weight) nn.init.zeros_(self.fusion_fc.bias) nn.init.xavier_uniform_(self.classifier.weight) nn.init.zeros_(self.classifier.bias) def forward(self, x, landmark_features, landmark_mask=None): features = self.backbone(x) hla_out = self.hla(features, landmark_mask) vit_out = self.vit(features) intensity_out, grad_magnitude, variance = self.intensity(features) landmark_out = self.landmark(landmark_features) hla_flat = self.fc_hla(hla_out.view(-1, 256*7*7)) intensity_flat = self.fc_intensity(intensity_out) fused = torch.cat([hla_flat, vit_out, intensity_flat, landmark_out], dim=1) fused = F.relu(self.fusion_fc(fused)) fused = self.bn_fusion(fused) fused = self.dropout(fused) logits = self.classifier(fused) return logits, hla_out, vit_out, grad_magnitude, variance # Load model model = QuadStreamHLAViT(num_classes=7) try: model.load_state_dict(torch.load(MODEL_WEIGHTS_PATH, map_location=torch.device('cpu'), weights_only=True)) print("Model weights loaded successfully.") except Exception as e: print(f"Error loading model weights: {e}") raise RuntimeError("Failed to load model weights.") model.eval() # Inference function def predict_emotion(image): try: # Convert image to RGB if isinstance(image, np.ndarray): image = Image.fromarray(image) image = image.convert("RGB") # Extract landmarks and mask lm_features = extract_landmark_features(image) lm_mask = get_landmark_mask(image) # Transform image img_tensor = transform(image).unsqueeze(0) lm_features_tensor = torch.tensor(lm_features, dtype=torch.float32).unsqueeze(0) # Run inference with torch.no_grad(): outputs, _, _, _, _ = model(img_tensor, lm_features_tensor, lm_mask) probs = F.softmax(outputs, dim=1)[0] pred_label = torch.argmax(probs).item() pred_emotion = class_mapping[pred_label] # Format probabilities prob_dict = {class_mapping[i]: f"{probs[i].item():.4f}" for i in range(len(class_mapping))} return pred_emotion, prob_dict except Exception as e: return "Error", {"Message": f"Failed to process image: {str(e)}"} # Gradio interface iface = gr.Interface( fn=predict_emotion, inputs=gr.Image(type="pil", label="Upload an Image"), outputs=[ gr.Textbox(label="Predicted Emotion"), gr.JSON(label="Emotion Probabilities") ], title="Facial Emotion Recognition with QuadStreamHLAViT", description="Upload an image to predict facial emotions (Surprise, Fear, Disgust, Happiness, Sadness, Anger, Neutral) using a QuadStreamHLAViT model trained on RAF-DB. Model accuracy: 82.31%.", allow_flagging="never" ) # Clean up MediaPipe def cleanup(): face_mesh.close() import atexit atexit.register(cleanup) # Launch the app if __name__ == "__main__": iface.launch()