SakibRumu
Update app.py
403a567 verified
import gradio as gr
import torch
import torch.nn as nn
import torch.nn.functional as F
import torchvision.transforms as transforms
import numpy as np
from PIL import Image
import cv2
import mediapipe as mp
import os
import requests
from efficientnet_pytorch import EfficientNet
# Define paths and URLs
MODEL_WEIGHTS_URL = "https://huggingface.co/Sakibrumu/Quad_Stream_Face_Emotion_Classifier/resolve/main/quad_stream_model_rafdb.pth"
MODEL_WEIGHTS_PATH = "best_model.pth"
# Download model weights from Hugging Face Model Hub
def download_model_weights():
if not os.path.exists(MODEL_WEIGHTS_PATH):
print(f"Downloading model weights from {MODEL_WEIGHTS_URL}...")
try:
response = requests.get(MODEL_WEIGHTS_URL, stream=True, timeout=30)
response.raise_for_status()
with open(MODEL_WEIGHTS_PATH, "wb") as f:
for chunk in response.iter_content(chunk_size=8192):
if chunk:
f.write(chunk)
print("Model weights downloaded successfully.")
except Exception as e:
print(f"Failed to download model weights: {e}")
raise RuntimeError("Model weights download failed.")
else:
print("Model weights already exist locally.")
download_model_weights()
# Initialize MediaPipe Face Mesh
mp_face_mesh = mp.solutions.face_mesh
face_mesh = mp_face_mesh.FaceMesh(
max_num_faces=1,
refine_landmarks=True,
min_detection_confidence=0.5,
min_tracking_confidence=0.5
)
# Class mapping for RAF-DB
class_mapping = {
0: "Surprise",
1: "Fear",
2: "Disgust",
3: "Happiness",
4: "Sadness",
5: "Anger",
6: "Neutral"
}
# Transform for input images
transform = transforms.Compose([
transforms.Resize((224, 224)),
transforms.ToTensor(),
transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])
# Function to extract landmark features using MediaPipe
def extract_landmark_features(image):
image_np = np.array(image)
h, w = image_np.shape[:2]
image_rgb = cv2.cvtColor(image_np, cv2.COLOR_RGB2BGR)
results = face_mesh.process(image_rgb)
if not results.multi_face_landmarks:
return np.zeros(14, dtype=np.float32)
landmarks = results.multi_face_landmarks[0].landmark
# Map MediaPipe landmarks to approximate dlib indices
key_points = {
'left_eye': (landmarks[159].x * w, landmarks[159].y * h),
'right_eye': (landmarks[386].x * w, landmarks[386].y * h),
'nose_tip': (landmarks[1].x * w, landmarks[1].y * h),
'mouth_left': (landmarks[61].x * w, landmarks[61].y * h),
'mouth_right': (landmarks[291].x * w, landmarks[291].y * h),
'left_eyebrow': (landmarks[70].x * w, landmarks[70].y * h),
'right_eyebrow': (landmarks[300].x * w, landmarks[300].y * h),
'jaw_left': (landmarks[172].x * w, landmarks[172].y * h),
'jaw_right': (landmarks[397].x * w, landmarks[397].y * h),
'chin': (landmarks[152].x * w, landmarks[152].y * h),
'left_lower_eyelid': (landmarks[145].x * w, landmarks[145].y * h),
'right_lower_eyelid': (landmarks[374].x * w, landmarks[374].y * h),
'left_cheek': (landmarks[137].x * w, landmarks[137].y * h),
'right_cheek': (landmarks[366].x * w, landmarks[366].y * h)
}
features = []
eye_dist = np.sqrt((key_points['left_eye'][0] - key_points['right_eye'][0])**2 +
(key_points['left_eye'][1] - key_points['right_eye'][1])**2)
features.append(eye_dist)
mouth_width = np.sqrt((key_points['mouth_left'][0] - key_points['mouth_right'][0])**2 +
(key_points['mouth_left'][1] - key_points['mouth_right'][1])**2)
features.append(mouth_width)
nose_to_mouth_left = np.sqrt((key_points['nose_tip'][0] - key_points['mouth_left'][0])**2 +
(key_points['nose_tip'][1] - key_points['mouth_left'][1])**2)
nose_to_mouth_right = np.sqrt((key_points['nose_tip'][0] - key_points['mouth_right'][0])**2 +
(key_points['nose_tip'][1] - key_points['mouth_right'][1])**2)
features.extend([nose_to_mouth_left, nose_to_mouth_right])
left_eye_to_nose = np.sqrt((key_points['left_eye'][0] - key_points['nose_tip'][0])**2 +
(key_points['left_eye'][1] - key_points['nose_tip'][1])**2)
right_eye_to_nose = np.sqrt((key_points['right_eye'][0] - key_points['nose_tip'][0])**2 +
(key_points['right_eye'][1] - key_points['nose_tip'][1])**2)
features.extend([left_eye_to_nose, right_eye_to_nose])
vec1 = np.array([key_points['left_eye'][0] - key_points['nose_tip'][0],
key_points['left_eye'][1] - key_points['nose_tip'][1]])
vec2 = np.array([key_points['right_eye'][0] - key_points['nose_tip'][0],
key_points['right_eye'][1] - key_points['nose_tip'][1]])
cos_angle = np.dot(vec1, vec2) / (np.linalg.norm(vec1) * np.linalg.norm(vec2) + 1e-8)
angle = np.arccos(np.clip(cos_angle, -1.0, 1.0))
features.append(angle)
mouth_center = ((key_points['mouth_left'][0] + key_points['mouth_right'][0]) / 2,
(key_points['mouth_left'][1] + key_points['mouth_right'][1]) / 2)
mouth_to_left_eye = np.sqrt((mouth_center[0] - key_points['left_eye'][0])**2 +
(mouth_center[1] - key_points['left_eye'][1])**2)
mouth_to_right_eye = np.sqrt((mouth_center[0] - key_points['right_eye'][0])**2 +
(mouth_center[1] - key_points['right_eye'][1])**2)
features.extend([mouth_to_left_eye, mouth_to_right_eye])
mouth_aspect_ratio = mouth_width / (nose_to_mouth_left + nose_to_mouth_right + 1e-8)
features.append(mouth_aspect_ratio)
left_eyebrow_to_eye = np.sqrt((key_points['left_eyebrow'][0] - key_points['left_eye'][0])**2 +
(key_points['left_eyebrow'][1] - key_points['left_eye'][1])**2)
right_eyebrow_to_eye = np.sqrt((key_points['right_eyebrow'][0] - key_points['right_eye'][0])**2 +
(key_points['right_eyebrow'][1] - key_points['right_eye'][1])**2)
features.extend([left_eyebrow_to_eye, right_eyebrow_to_eye])
left_au6 = np.sqrt((key_points['left_lower_eyelid'][0] - key_points['left_cheek'][0])**2 +
(key_points['left_lower_eyelid'][1] - key_points['left_cheek'][1])**2)
right_au6 = np.sqrt((key_points['right_lower_eyelid'][0] - key_points['right_cheek'][0])**2 +
(key_points['right_lower_eyelid'][1] - key_points['right_cheek'][1])**2)
avg_au6 = (left_au6 + right_au6) / 2
features.append(avg_au6)
mouth_left_to_chin = np.sqrt((key_points['mouth_left'][0] - key_points['chin'][0])**2 +
(key_points['mouth_left'][1] - key_points['chin'][1])**2)
mouth_right_to_chin = np.sqrt((key_points['mouth_right'][0] - key_points['chin'][0])**2 +
(key_points['mouth_right'][1] - key_points['chin'][1])**2)
avg_au12 = (mouth_left_to_chin + mouth_right_to_chin) / (2 * (mouth_width + 1e-8))
features.append(avg_au12)
return np.array(features, dtype=np.float32)
# Function to get landmark mask using MediaPipe
def get_landmark_mask(image, target_size=(7, 7)):
image_np = np.array(image)
h, w = image_np.shape[:2]
image_rgb = cv2.cvtColor(image_np, cv2.COLOR_RGB2BGR)
results = face_mesh.process(image_rgb)
if not results.multi_face_landmarks:
return np.ones(target_size, dtype=np.float32)
landmarks = results.multi_face_landmarks[0].landmark
mask = np.zeros((h, w), dtype=np.float32)
key_indices = [
159, 386, # Eyes
145, 374, # Lower eyelids
61, 291, 80, 310, # Mouth
70, 300, # Eyebrows
172, 397, 152, # Jaw/Chin
137, 366 # Cheeks
]
key_points = [(landmarks[i].x * w, landmarks[i].y * h) for i in key_indices]
for i, (x, y) in enumerate(key_points):
radius = 30 if i in [4, 5, 6, 7, 12, 13] else 20
cv2.circle(mask, (int(x), int(y)), radius, 1.0, -1)
mask = cv2.resize(mask, target_size, interpolation=cv2.INTER_LINEAR)
mask = np.clip(mask, 0, 1)
return mask
# Model definitions (unchanged)
class EfficientNetBackbone(nn.Module):
def __init__(self):
super(EfficientNetBackbone, self).__init__()
self.efficientnet = EfficientNet.from_pretrained('efficientnet-b4')
self.efficientnet._conv_stem = nn.Conv2d(3, 48, kernel_size=3, stride=2, padding=1, bias=False)
self.channel_reducer = nn.Conv2d(1792, 256, kernel_size=1, stride=1, padding=0, bias=False)
self.bn = nn.BatchNorm2d(256)
nn.init.xavier_uniform_(self.channel_reducer.weight)
def forward(self, x):
x = self.efficientnet.extract_features(x)
x = self.channel_reducer(x)
x = self.bn(x)
return x
class HLA(nn.Module):
def __init__(self, in_channels=256, reduction=4):
super(HLA, self).__init__()
reduced_channels = in_channels // reduction
self.spatial_branch1 = nn.Conv2d(in_channels, reduced_channels, 1)
self.spatial_branch2 = nn.Conv2d(in_channels, reduced_channels, 1)
self.sigmoid = nn.Sigmoid()
self.channel_restore = nn.Conv2d(reduced_channels, in_channels, 1)
self.channel_attention = nn.Sequential(
nn.AdaptiveAvgPool2d(1),
nn.Conv2d(in_channels, in_channels // reduction, 1, bias=False),
nn.ReLU(),
nn.Conv2d(in_channels // reduction, in_channels, 1, bias=False),
nn.Sigmoid()
)
self.bn = nn.BatchNorm2d(in_channels, eps=1e-5)
self.dropout = nn.Dropout2d(0.2)
def forward(self, x, landmark_mask=None):
b1 = self.spatial_branch1(x)
b2 = self.spatial_branch2(x)
spatial_attn = self.sigmoid(torch.max(b1, b2))
spatial_attn = self.channel_restore(spatial_attn)
if landmark_mask is not None:
landmark_mask = torch.tensor(landmark_mask, dtype=x.dtype)
landmark_mask = landmark_mask.view(-1, 1, 7, 7)
spatial_attn = spatial_attn * landmark_mask
spatial_attn = self.dropout(spatial_attn)
spatial_out = x * spatial_attn
channel_attn = self.channel_attention(spatial_out)
channel_attn = self.dropout(channel_attn)
out = spatial_out * channel_attn
out = self.bn(out)
return out
class ViT(nn.Module):
def __init__(self, in_channels=256, patch_size=1, embed_dim=768, num_layers=8, num_heads=12):
super(ViT, self).__init__()
self.patch_embed = nn.Conv2d(in_channels, embed_dim, kernel_size=patch_size, stride=patch_size)
self.cls_token = nn.Parameter(torch.zeros(1, 1, embed_dim))
num_patches = (7 // patch_size) * (7 // patch_size)
self.pos_embed = nn.Parameter(torch.zeros(1, num_patches + 1, embed_dim))
self.transformer = nn.ModuleList([
nn.TransformerEncoderLayer(embed_dim, num_heads, dim_feedforward=1536, activation="gelu")
for _ in range(num_layers)
])
self.ln = nn.LayerNorm(embed_dim)
self.bn = nn.BatchNorm1d(embed_dim, eps=1e-5)
nn.init.xavier_uniform_(self.patch_embed.weight)
nn.init.zeros_(self.patch_embed.bias)
nn.init.normal_(self.cls_token, std=0.02)
nn.init.normal_(self.pos_embed, std=0.02)
def forward(self, x):
x = self.patch_embed(x)
x = x.flatten(2).transpose(1, 2)
cls_tokens = self.cls_token.expand(x.size(0), -1, -1)
x = torch.cat([cls_tokens, x], dim=1)
x = x + self.pos_embed
for layer in self.transformer:
x = layer(x)
x = x[:, 0]
x = self.ln(x)
x = self.bn(x)
return x
class IntensityStream(nn.Module):
def __init__(self, in_channels=256):
super(IntensityStream, self).__init__()
sobel_x = torch.tensor([[-1, 0, 1], [-2, 0, 2], [-1, 0, 1]], dtype=torch.float32)
sobel_y = torch.tensor([[-1, -2, -1], [0, 0, 0], [1, 2, 1]], dtype=torch.float32)
self.sobel_x = nn.Conv2d(in_channels, in_channels, 3, padding=1, bias=False, groups=in_channels)
self.sobel_y = nn.Conv2d(in_channels, in_channels, 3, padding=1, bias=False, groups=in_channels)
self.sobel_x.weight.data = sobel_x.repeat(in_channels, 1, 1, 1)
self.sobel_y.weight.data = sobel_y.repeat(in_channels, 1, 1, 1)
self.conv = nn.Conv2d(in_channels, 128, 3, padding=1)
self.bn = nn.BatchNorm2d(128, eps=1e-5)
self.pool = nn.AdaptiveAvgPool2d(1)
self.attention = nn.MultiheadAttention(embed_dim=128, num_heads=1)
nn.init.xavier_uniform_(self.conv.weight)
nn.init.zeros_(self.conv.bias)
def forward(self, x):
gx = self.sobel_x(x)
gy = self.sobel_y(x)
grad_magnitude = torch.sqrt(gx**2 + gy**2 + 1e-8)
variance = ((x - x.mean(dim=1, keepdim=True))**2).mean(dim=1).flatten(1)
cnn_out = F.relu(self.conv(grad_magnitude))
cnn_out = self.bn(cnn_out)
texture_out = self.pool(cnn_out).squeeze(-1).squeeze(-1)
attn_in = cnn_out.flatten(2).permute(2, 0, 1)
attn_in = attn_in / (attn_in.norm(dim=-1, keepdim=True) + 1e-8)
attn_out, _ = self.attention(attn_in, attn_in, attn_in)
context_out = attn_out.mean(dim=0)
out = torch.cat([texture_out, context_out], dim=1)
return out, grad_magnitude, variance
class LandmarkStream(nn.Module):
def __init__(self, input_dim=14, embed_dim=768):
super(LandmarkStream, self).__init__()
self.fc1 = nn.Linear(input_dim, 128)
self.fc2 = nn.Linear(128, 256)
self.fc3 = nn.Linear(256, embed_dim)
self.bn1 = nn.BatchNorm1d(128)
self.bn2 = nn.BatchNorm1d(256)
self.bn3 = nn.BatchNorm1d(embed_dim)
self.dropout = nn.Dropout(0.4)
nn.init.xavier_uniform_(self.fc1.weight)
nn.init.zeros_(self.fc1.bias)
nn.init.xavier_uniform_(self.fc2.weight)
nn.init.zeros_(self.fc2.bias)
nn.init.xavier_uniform_(self.fc3.weight)
nn.init.zeros_(self.fc3.bias)
def forward(self, x):
x = F.relu(self.bn1(self.fc1(x)))
x = self.dropout(x)
x = F.relu(self.bn2(self.fc2(x)))
x = self.dropout(x)
x = self.bn3(self.fc3(x))
return x
class QuadStreamHLAViT(nn.Module):
def __init__(self, num_classes=7):
super(QuadStreamHLAViT, self).__init__()
self.backbone = EfficientNetBackbone()
self.hla = HLA()
self.vit = ViT()
self.intensity = IntensityStream()
self.landmark = LandmarkStream(input_dim=14, embed_dim=768)
self.fc_hla = nn.Linear(256*7*7, 768)
self.fc_intensity = nn.Linear(256, 768)
self.fusion_fc = nn.Linear(768*4, 512)
self.bn_fusion = nn.BatchNorm1d(512, eps=1e-5)
self.dropout = nn.Dropout(0.6)
self.classifier = nn.Linear(512, num_classes)
nn.init.xavier_uniform_(self.fc_hla.weight)
nn.init.zeros_(self.fc_hla.bias)
nn.init.xavier_uniform_(self.fc_intensity.weight)
nn.init.zeros_(self.fc_intensity.bias)
nn.init.xavier_uniform_(self.fusion_fc.weight)
nn.init.zeros_(self.fusion_fc.bias)
nn.init.xavier_uniform_(self.classifier.weight)
nn.init.zeros_(self.classifier.bias)
def forward(self, x, landmark_features, landmark_mask=None):
features = self.backbone(x)
hla_out = self.hla(features, landmark_mask)
vit_out = self.vit(features)
intensity_out, grad_magnitude, variance = self.intensity(features)
landmark_out = self.landmark(landmark_features)
hla_flat = self.fc_hla(hla_out.view(-1, 256*7*7))
intensity_flat = self.fc_intensity(intensity_out)
fused = torch.cat([hla_flat, vit_out, intensity_flat, landmark_out], dim=1)
fused = F.relu(self.fusion_fc(fused))
fused = self.bn_fusion(fused)
fused = self.dropout(fused)
logits = self.classifier(fused)
return logits, hla_out, vit_out, grad_magnitude, variance
# Load model
model = QuadStreamHLAViT(num_classes=7)
try:
model.load_state_dict(torch.load(MODEL_WEIGHTS_PATH, map_location=torch.device('cpu'), weights_only=True))
print("Model weights loaded successfully.")
except Exception as e:
print(f"Error loading model weights: {e}")
raise RuntimeError("Failed to load model weights.")
model.eval()
# Inference function
def predict_emotion(image):
try:
# Convert image to RGB
if isinstance(image, np.ndarray):
image = Image.fromarray(image)
image = image.convert("RGB")
# Extract landmarks and mask
lm_features = extract_landmark_features(image)
lm_mask = get_landmark_mask(image)
# Transform image
img_tensor = transform(image).unsqueeze(0)
lm_features_tensor = torch.tensor(lm_features, dtype=torch.float32).unsqueeze(0)
# Run inference
with torch.no_grad():
outputs, _, _, _, _ = model(img_tensor, lm_features_tensor, lm_mask)
probs = F.softmax(outputs, dim=1)[0]
pred_label = torch.argmax(probs).item()
pred_emotion = class_mapping[pred_label]
# Format probabilities
prob_dict = {class_mapping[i]: f"{probs[i].item():.4f}" for i in range(len(class_mapping))}
return pred_emotion, prob_dict
except Exception as e:
return "Error", {"Message": f"Failed to process image: {str(e)}"}
# Gradio interface
iface = gr.Interface(
fn=predict_emotion,
inputs=gr.Image(type="pil", label="Upload an Image"),
outputs=[
gr.Textbox(label="Predicted Emotion"),
gr.JSON(label="Emotion Probabilities")
],
title="Facial Emotion Recognition with QuadStreamHLAViT",
description="Upload an image to predict facial emotions (Surprise, Fear, Disgust, Happiness, Sadness, Anger, Neutral) using a QuadStreamHLAViT model trained on RAF-DB. Model accuracy: 82.31%.",
allow_flagging="never"
)
# Clean up MediaPipe
def cleanup():
face_mesh.close()
import atexit
atexit.register(cleanup)
# Launch the app
if __name__ == "__main__":
iface.launch()