Spaces:
Build error
Build error
| import gradio as gr | |
| import torch | |
| import torch.nn as nn | |
| import torch.nn.functional as F | |
| import torchvision.transforms as transforms | |
| import numpy as np | |
| from PIL import Image | |
| import cv2 | |
| import mediapipe as mp | |
| import os | |
| import requests | |
| from efficientnet_pytorch import EfficientNet | |
| # Define paths and URLs | |
| MODEL_WEIGHTS_URL = "https://huggingface.co/Sakibrumu/Quad_Stream_Face_Emotion_Classifier/resolve/main/quad_stream_model_rafdb.pth" | |
| MODEL_WEIGHTS_PATH = "best_model.pth" | |
| # Download model weights from Hugging Face Model Hub | |
| def download_model_weights(): | |
| if not os.path.exists(MODEL_WEIGHTS_PATH): | |
| print(f"Downloading model weights from {MODEL_WEIGHTS_URL}...") | |
| try: | |
| response = requests.get(MODEL_WEIGHTS_URL, stream=True, timeout=30) | |
| response.raise_for_status() | |
| with open(MODEL_WEIGHTS_PATH, "wb") as f: | |
| for chunk in response.iter_content(chunk_size=8192): | |
| if chunk: | |
| f.write(chunk) | |
| print("Model weights downloaded successfully.") | |
| except Exception as e: | |
| print(f"Failed to download model weights: {e}") | |
| raise RuntimeError("Model weights download failed.") | |
| else: | |
| print("Model weights already exist locally.") | |
| download_model_weights() | |
| # Initialize MediaPipe Face Mesh | |
| mp_face_mesh = mp.solutions.face_mesh | |
| face_mesh = mp_face_mesh.FaceMesh( | |
| max_num_faces=1, | |
| refine_landmarks=True, | |
| min_detection_confidence=0.5, | |
| min_tracking_confidence=0.5 | |
| ) | |
| # Class mapping for RAF-DB | |
| class_mapping = { | |
| 0: "Surprise", | |
| 1: "Fear", | |
| 2: "Disgust", | |
| 3: "Happiness", | |
| 4: "Sadness", | |
| 5: "Anger", | |
| 6: "Neutral" | |
| } | |
| # Transform for input images | |
| transform = transforms.Compose([ | |
| transforms.Resize((224, 224)), | |
| transforms.ToTensor(), | |
| transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]) | |
| ]) | |
| # Function to extract landmark features using MediaPipe | |
| def extract_landmark_features(image): | |
| image_np = np.array(image) | |
| h, w = image_np.shape[:2] | |
| image_rgb = cv2.cvtColor(image_np, cv2.COLOR_RGB2BGR) | |
| results = face_mesh.process(image_rgb) | |
| if not results.multi_face_landmarks: | |
| return np.zeros(14, dtype=np.float32) | |
| landmarks = results.multi_face_landmarks[0].landmark | |
| # Map MediaPipe landmarks to approximate dlib indices | |
| key_points = { | |
| 'left_eye': (landmarks[159].x * w, landmarks[159].y * h), | |
| 'right_eye': (landmarks[386].x * w, landmarks[386].y * h), | |
| 'nose_tip': (landmarks[1].x * w, landmarks[1].y * h), | |
| 'mouth_left': (landmarks[61].x * w, landmarks[61].y * h), | |
| 'mouth_right': (landmarks[291].x * w, landmarks[291].y * h), | |
| 'left_eyebrow': (landmarks[70].x * w, landmarks[70].y * h), | |
| 'right_eyebrow': (landmarks[300].x * w, landmarks[300].y * h), | |
| 'jaw_left': (landmarks[172].x * w, landmarks[172].y * h), | |
| 'jaw_right': (landmarks[397].x * w, landmarks[397].y * h), | |
| 'chin': (landmarks[152].x * w, landmarks[152].y * h), | |
| 'left_lower_eyelid': (landmarks[145].x * w, landmarks[145].y * h), | |
| 'right_lower_eyelid': (landmarks[374].x * w, landmarks[374].y * h), | |
| 'left_cheek': (landmarks[137].x * w, landmarks[137].y * h), | |
| 'right_cheek': (landmarks[366].x * w, landmarks[366].y * h) | |
| } | |
| features = [] | |
| eye_dist = np.sqrt((key_points['left_eye'][0] - key_points['right_eye'][0])**2 + | |
| (key_points['left_eye'][1] - key_points['right_eye'][1])**2) | |
| features.append(eye_dist) | |
| mouth_width = np.sqrt((key_points['mouth_left'][0] - key_points['mouth_right'][0])**2 + | |
| (key_points['mouth_left'][1] - key_points['mouth_right'][1])**2) | |
| features.append(mouth_width) | |
| nose_to_mouth_left = np.sqrt((key_points['nose_tip'][0] - key_points['mouth_left'][0])**2 + | |
| (key_points['nose_tip'][1] - key_points['mouth_left'][1])**2) | |
| nose_to_mouth_right = np.sqrt((key_points['nose_tip'][0] - key_points['mouth_right'][0])**2 + | |
| (key_points['nose_tip'][1] - key_points['mouth_right'][1])**2) | |
| features.extend([nose_to_mouth_left, nose_to_mouth_right]) | |
| left_eye_to_nose = np.sqrt((key_points['left_eye'][0] - key_points['nose_tip'][0])**2 + | |
| (key_points['left_eye'][1] - key_points['nose_tip'][1])**2) | |
| right_eye_to_nose = np.sqrt((key_points['right_eye'][0] - key_points['nose_tip'][0])**2 + | |
| (key_points['right_eye'][1] - key_points['nose_tip'][1])**2) | |
| features.extend([left_eye_to_nose, right_eye_to_nose]) | |
| vec1 = np.array([key_points['left_eye'][0] - key_points['nose_tip'][0], | |
| key_points['left_eye'][1] - key_points['nose_tip'][1]]) | |
| vec2 = np.array([key_points['right_eye'][0] - key_points['nose_tip'][0], | |
| key_points['right_eye'][1] - key_points['nose_tip'][1]]) | |
| cos_angle = np.dot(vec1, vec2) / (np.linalg.norm(vec1) * np.linalg.norm(vec2) + 1e-8) | |
| angle = np.arccos(np.clip(cos_angle, -1.0, 1.0)) | |
| features.append(angle) | |
| mouth_center = ((key_points['mouth_left'][0] + key_points['mouth_right'][0]) / 2, | |
| (key_points['mouth_left'][1] + key_points['mouth_right'][1]) / 2) | |
| mouth_to_left_eye = np.sqrt((mouth_center[0] - key_points['left_eye'][0])**2 + | |
| (mouth_center[1] - key_points['left_eye'][1])**2) | |
| mouth_to_right_eye = np.sqrt((mouth_center[0] - key_points['right_eye'][0])**2 + | |
| (mouth_center[1] - key_points['right_eye'][1])**2) | |
| features.extend([mouth_to_left_eye, mouth_to_right_eye]) | |
| mouth_aspect_ratio = mouth_width / (nose_to_mouth_left + nose_to_mouth_right + 1e-8) | |
| features.append(mouth_aspect_ratio) | |
| left_eyebrow_to_eye = np.sqrt((key_points['left_eyebrow'][0] - key_points['left_eye'][0])**2 + | |
| (key_points['left_eyebrow'][1] - key_points['left_eye'][1])**2) | |
| right_eyebrow_to_eye = np.sqrt((key_points['right_eyebrow'][0] - key_points['right_eye'][0])**2 + | |
| (key_points['right_eyebrow'][1] - key_points['right_eye'][1])**2) | |
| features.extend([left_eyebrow_to_eye, right_eyebrow_to_eye]) | |
| left_au6 = np.sqrt((key_points['left_lower_eyelid'][0] - key_points['left_cheek'][0])**2 + | |
| (key_points['left_lower_eyelid'][1] - key_points['left_cheek'][1])**2) | |
| right_au6 = np.sqrt((key_points['right_lower_eyelid'][0] - key_points['right_cheek'][0])**2 + | |
| (key_points['right_lower_eyelid'][1] - key_points['right_cheek'][1])**2) | |
| avg_au6 = (left_au6 + right_au6) / 2 | |
| features.append(avg_au6) | |
| mouth_left_to_chin = np.sqrt((key_points['mouth_left'][0] - key_points['chin'][0])**2 + | |
| (key_points['mouth_left'][1] - key_points['chin'][1])**2) | |
| mouth_right_to_chin = np.sqrt((key_points['mouth_right'][0] - key_points['chin'][0])**2 + | |
| (key_points['mouth_right'][1] - key_points['chin'][1])**2) | |
| avg_au12 = (mouth_left_to_chin + mouth_right_to_chin) / (2 * (mouth_width + 1e-8)) | |
| features.append(avg_au12) | |
| return np.array(features, dtype=np.float32) | |
| # Function to get landmark mask using MediaPipe | |
| def get_landmark_mask(image, target_size=(7, 7)): | |
| image_np = np.array(image) | |
| h, w = image_np.shape[:2] | |
| image_rgb = cv2.cvtColor(image_np, cv2.COLOR_RGB2BGR) | |
| results = face_mesh.process(image_rgb) | |
| if not results.multi_face_landmarks: | |
| return np.ones(target_size, dtype=np.float32) | |
| landmarks = results.multi_face_landmarks[0].landmark | |
| mask = np.zeros((h, w), dtype=np.float32) | |
| key_indices = [ | |
| 159, 386, # Eyes | |
| 145, 374, # Lower eyelids | |
| 61, 291, 80, 310, # Mouth | |
| 70, 300, # Eyebrows | |
| 172, 397, 152, # Jaw/Chin | |
| 137, 366 # Cheeks | |
| ] | |
| key_points = [(landmarks[i].x * w, landmarks[i].y * h) for i in key_indices] | |
| for i, (x, y) in enumerate(key_points): | |
| radius = 30 if i in [4, 5, 6, 7, 12, 13] else 20 | |
| cv2.circle(mask, (int(x), int(y)), radius, 1.0, -1) | |
| mask = cv2.resize(mask, target_size, interpolation=cv2.INTER_LINEAR) | |
| mask = np.clip(mask, 0, 1) | |
| return mask | |
| # Model definitions (unchanged) | |
| class EfficientNetBackbone(nn.Module): | |
| def __init__(self): | |
| super(EfficientNetBackbone, self).__init__() | |
| self.efficientnet = EfficientNet.from_pretrained('efficientnet-b4') | |
| self.efficientnet._conv_stem = nn.Conv2d(3, 48, kernel_size=3, stride=2, padding=1, bias=False) | |
| self.channel_reducer = nn.Conv2d(1792, 256, kernel_size=1, stride=1, padding=0, bias=False) | |
| self.bn = nn.BatchNorm2d(256) | |
| nn.init.xavier_uniform_(self.channel_reducer.weight) | |
| def forward(self, x): | |
| x = self.efficientnet.extract_features(x) | |
| x = self.channel_reducer(x) | |
| x = self.bn(x) | |
| return x | |
| class HLA(nn.Module): | |
| def __init__(self, in_channels=256, reduction=4): | |
| super(HLA, self).__init__() | |
| reduced_channels = in_channels // reduction | |
| self.spatial_branch1 = nn.Conv2d(in_channels, reduced_channels, 1) | |
| self.spatial_branch2 = nn.Conv2d(in_channels, reduced_channels, 1) | |
| self.sigmoid = nn.Sigmoid() | |
| self.channel_restore = nn.Conv2d(reduced_channels, in_channels, 1) | |
| self.channel_attention = nn.Sequential( | |
| nn.AdaptiveAvgPool2d(1), | |
| nn.Conv2d(in_channels, in_channels // reduction, 1, bias=False), | |
| nn.ReLU(), | |
| nn.Conv2d(in_channels // reduction, in_channels, 1, bias=False), | |
| nn.Sigmoid() | |
| ) | |
| self.bn = nn.BatchNorm2d(in_channels, eps=1e-5) | |
| self.dropout = nn.Dropout2d(0.2) | |
| def forward(self, x, landmark_mask=None): | |
| b1 = self.spatial_branch1(x) | |
| b2 = self.spatial_branch2(x) | |
| spatial_attn = self.sigmoid(torch.max(b1, b2)) | |
| spatial_attn = self.channel_restore(spatial_attn) | |
| if landmark_mask is not None: | |
| landmark_mask = torch.tensor(landmark_mask, dtype=x.dtype) | |
| landmark_mask = landmark_mask.view(-1, 1, 7, 7) | |
| spatial_attn = spatial_attn * landmark_mask | |
| spatial_attn = self.dropout(spatial_attn) | |
| spatial_out = x * spatial_attn | |
| channel_attn = self.channel_attention(spatial_out) | |
| channel_attn = self.dropout(channel_attn) | |
| out = spatial_out * channel_attn | |
| out = self.bn(out) | |
| return out | |
| class ViT(nn.Module): | |
| def __init__(self, in_channels=256, patch_size=1, embed_dim=768, num_layers=8, num_heads=12): | |
| super(ViT, self).__init__() | |
| self.patch_embed = nn.Conv2d(in_channels, embed_dim, kernel_size=patch_size, stride=patch_size) | |
| self.cls_token = nn.Parameter(torch.zeros(1, 1, embed_dim)) | |
| num_patches = (7 // patch_size) * (7 // patch_size) | |
| self.pos_embed = nn.Parameter(torch.zeros(1, num_patches + 1, embed_dim)) | |
| self.transformer = nn.ModuleList([ | |
| nn.TransformerEncoderLayer(embed_dim, num_heads, dim_feedforward=1536, activation="gelu") | |
| for _ in range(num_layers) | |
| ]) | |
| self.ln = nn.LayerNorm(embed_dim) | |
| self.bn = nn.BatchNorm1d(embed_dim, eps=1e-5) | |
| nn.init.xavier_uniform_(self.patch_embed.weight) | |
| nn.init.zeros_(self.patch_embed.bias) | |
| nn.init.normal_(self.cls_token, std=0.02) | |
| nn.init.normal_(self.pos_embed, std=0.02) | |
| def forward(self, x): | |
| x = self.patch_embed(x) | |
| x = x.flatten(2).transpose(1, 2) | |
| cls_tokens = self.cls_token.expand(x.size(0), -1, -1) | |
| x = torch.cat([cls_tokens, x], dim=1) | |
| x = x + self.pos_embed | |
| for layer in self.transformer: | |
| x = layer(x) | |
| x = x[:, 0] | |
| x = self.ln(x) | |
| x = self.bn(x) | |
| return x | |
| class IntensityStream(nn.Module): | |
| def __init__(self, in_channels=256): | |
| super(IntensityStream, self).__init__() | |
| sobel_x = torch.tensor([[-1, 0, 1], [-2, 0, 2], [-1, 0, 1]], dtype=torch.float32) | |
| sobel_y = torch.tensor([[-1, -2, -1], [0, 0, 0], [1, 2, 1]], dtype=torch.float32) | |
| self.sobel_x = nn.Conv2d(in_channels, in_channels, 3, padding=1, bias=False, groups=in_channels) | |
| self.sobel_y = nn.Conv2d(in_channels, in_channels, 3, padding=1, bias=False, groups=in_channels) | |
| self.sobel_x.weight.data = sobel_x.repeat(in_channels, 1, 1, 1) | |
| self.sobel_y.weight.data = sobel_y.repeat(in_channels, 1, 1, 1) | |
| self.conv = nn.Conv2d(in_channels, 128, 3, padding=1) | |
| self.bn = nn.BatchNorm2d(128, eps=1e-5) | |
| self.pool = nn.AdaptiveAvgPool2d(1) | |
| self.attention = nn.MultiheadAttention(embed_dim=128, num_heads=1) | |
| nn.init.xavier_uniform_(self.conv.weight) | |
| nn.init.zeros_(self.conv.bias) | |
| def forward(self, x): | |
| gx = self.sobel_x(x) | |
| gy = self.sobel_y(x) | |
| grad_magnitude = torch.sqrt(gx**2 + gy**2 + 1e-8) | |
| variance = ((x - x.mean(dim=1, keepdim=True))**2).mean(dim=1).flatten(1) | |
| cnn_out = F.relu(self.conv(grad_magnitude)) | |
| cnn_out = self.bn(cnn_out) | |
| texture_out = self.pool(cnn_out).squeeze(-1).squeeze(-1) | |
| attn_in = cnn_out.flatten(2).permute(2, 0, 1) | |
| attn_in = attn_in / (attn_in.norm(dim=-1, keepdim=True) + 1e-8) | |
| attn_out, _ = self.attention(attn_in, attn_in, attn_in) | |
| context_out = attn_out.mean(dim=0) | |
| out = torch.cat([texture_out, context_out], dim=1) | |
| return out, grad_magnitude, variance | |
| class LandmarkStream(nn.Module): | |
| def __init__(self, input_dim=14, embed_dim=768): | |
| super(LandmarkStream, self).__init__() | |
| self.fc1 = nn.Linear(input_dim, 128) | |
| self.fc2 = nn.Linear(128, 256) | |
| self.fc3 = nn.Linear(256, embed_dim) | |
| self.bn1 = nn.BatchNorm1d(128) | |
| self.bn2 = nn.BatchNorm1d(256) | |
| self.bn3 = nn.BatchNorm1d(embed_dim) | |
| self.dropout = nn.Dropout(0.4) | |
| nn.init.xavier_uniform_(self.fc1.weight) | |
| nn.init.zeros_(self.fc1.bias) | |
| nn.init.xavier_uniform_(self.fc2.weight) | |
| nn.init.zeros_(self.fc2.bias) | |
| nn.init.xavier_uniform_(self.fc3.weight) | |
| nn.init.zeros_(self.fc3.bias) | |
| def forward(self, x): | |
| x = F.relu(self.bn1(self.fc1(x))) | |
| x = self.dropout(x) | |
| x = F.relu(self.bn2(self.fc2(x))) | |
| x = self.dropout(x) | |
| x = self.bn3(self.fc3(x)) | |
| return x | |
| class QuadStreamHLAViT(nn.Module): | |
| def __init__(self, num_classes=7): | |
| super(QuadStreamHLAViT, self).__init__() | |
| self.backbone = EfficientNetBackbone() | |
| self.hla = HLA() | |
| self.vit = ViT() | |
| self.intensity = IntensityStream() | |
| self.landmark = LandmarkStream(input_dim=14, embed_dim=768) | |
| self.fc_hla = nn.Linear(256*7*7, 768) | |
| self.fc_intensity = nn.Linear(256, 768) | |
| self.fusion_fc = nn.Linear(768*4, 512) | |
| self.bn_fusion = nn.BatchNorm1d(512, eps=1e-5) | |
| self.dropout = nn.Dropout(0.6) | |
| self.classifier = nn.Linear(512, num_classes) | |
| nn.init.xavier_uniform_(self.fc_hla.weight) | |
| nn.init.zeros_(self.fc_hla.bias) | |
| nn.init.xavier_uniform_(self.fc_intensity.weight) | |
| nn.init.zeros_(self.fc_intensity.bias) | |
| nn.init.xavier_uniform_(self.fusion_fc.weight) | |
| nn.init.zeros_(self.fusion_fc.bias) | |
| nn.init.xavier_uniform_(self.classifier.weight) | |
| nn.init.zeros_(self.classifier.bias) | |
| def forward(self, x, landmark_features, landmark_mask=None): | |
| features = self.backbone(x) | |
| hla_out = self.hla(features, landmark_mask) | |
| vit_out = self.vit(features) | |
| intensity_out, grad_magnitude, variance = self.intensity(features) | |
| landmark_out = self.landmark(landmark_features) | |
| hla_flat = self.fc_hla(hla_out.view(-1, 256*7*7)) | |
| intensity_flat = self.fc_intensity(intensity_out) | |
| fused = torch.cat([hla_flat, vit_out, intensity_flat, landmark_out], dim=1) | |
| fused = F.relu(self.fusion_fc(fused)) | |
| fused = self.bn_fusion(fused) | |
| fused = self.dropout(fused) | |
| logits = self.classifier(fused) | |
| return logits, hla_out, vit_out, grad_magnitude, variance | |
| # Load model | |
| model = QuadStreamHLAViT(num_classes=7) | |
| try: | |
| model.load_state_dict(torch.load(MODEL_WEIGHTS_PATH, map_location=torch.device('cpu'), weights_only=True)) | |
| print("Model weights loaded successfully.") | |
| except Exception as e: | |
| print(f"Error loading model weights: {e}") | |
| raise RuntimeError("Failed to load model weights.") | |
| model.eval() | |
| # Inference function | |
| def predict_emotion(image): | |
| try: | |
| # Convert image to RGB | |
| if isinstance(image, np.ndarray): | |
| image = Image.fromarray(image) | |
| image = image.convert("RGB") | |
| # Extract landmarks and mask | |
| lm_features = extract_landmark_features(image) | |
| lm_mask = get_landmark_mask(image) | |
| # Transform image | |
| img_tensor = transform(image).unsqueeze(0) | |
| lm_features_tensor = torch.tensor(lm_features, dtype=torch.float32).unsqueeze(0) | |
| # Run inference | |
| with torch.no_grad(): | |
| outputs, _, _, _, _ = model(img_tensor, lm_features_tensor, lm_mask) | |
| probs = F.softmax(outputs, dim=1)[0] | |
| pred_label = torch.argmax(probs).item() | |
| pred_emotion = class_mapping[pred_label] | |
| # Format probabilities | |
| prob_dict = {class_mapping[i]: f"{probs[i].item():.4f}" for i in range(len(class_mapping))} | |
| return pred_emotion, prob_dict | |
| except Exception as e: | |
| return "Error", {"Message": f"Failed to process image: {str(e)}"} | |
| # Gradio interface | |
| iface = gr.Interface( | |
| fn=predict_emotion, | |
| inputs=gr.Image(type="pil", label="Upload an Image"), | |
| outputs=[ | |
| gr.Textbox(label="Predicted Emotion"), | |
| gr.JSON(label="Emotion Probabilities") | |
| ], | |
| title="Facial Emotion Recognition with QuadStreamHLAViT", | |
| description="Upload an image to predict facial emotions (Surprise, Fear, Disgust, Happiness, Sadness, Anger, Neutral) using a QuadStreamHLAViT model trained on RAF-DB. Model accuracy: 82.31%.", | |
| allow_flagging="never" | |
| ) | |
| # Clean up MediaPipe | |
| def cleanup(): | |
| face_mesh.close() | |
| import atexit | |
| atexit.register(cleanup) | |
| # Launch the app | |
| if __name__ == "__main__": | |
| iface.launch() |