Spaces:
Sleeping
Sleeping
| import cv2 | |
| import torch | |
| import numpy as np | |
| from PIL import Image | |
| import torchvision.transforms as transforms | |
| import time | |
| import os | |
| import json | |
| from typing import Dict, List, Any | |
| from fastapi import FastAPI, UploadFile, File, HTTPException | |
| from fastapi.responses import JSONResponse, HTMLResponse | |
| import uuid | |
| from pathlib import Path | |
| import gradio as gr | |
| import tempfile | |
| app = FastAPI() | |
| # Global variable to store the history of largest face detections | |
| largest_face_detections = [] | |
| # EmotionCNN model definition | |
| class EmotionCNN(torch.nn.Module): | |
| def __init__(self, num_classes=7): | |
| super(EmotionCNN, self).__init__() | |
| # First convolutional block | |
| self.conv1 = torch.nn.Sequential( | |
| torch.nn.Conv2d(1, 64, kernel_size=3, padding=1), | |
| torch.nn.BatchNorm2d(64), | |
| torch.nn.ReLU(), | |
| torch.nn.MaxPool2d(kernel_size=2, stride=2) | |
| ) | |
| # Second convolutional block | |
| self.conv2 = torch.nn.Sequential( | |
| torch.nn.Conv2d(64, 128, kernel_size=3, padding=1), | |
| torch.nn.BatchNorm2d(128), | |
| torch.nn.ReLU(), | |
| torch.nn.MaxPool2d(kernel_size=2, stride=2) | |
| ) | |
| # Third convolutional block | |
| self.conv3 = torch.nn.Sequential( | |
| torch.nn.Conv2d(128, 256, kernel_size=3, padding=1), | |
| torch.nn.BatchNorm2d(256), | |
| torch.nn.ReLU(), | |
| torch.nn.MaxPool2d(kernel_size=2, stride=2) | |
| ) | |
| # Fourth convolutional block | |
| self.conv4 = torch.nn.Sequential( | |
| torch.nn.Conv2d(256, 512, kernel_size=3, padding=1), | |
| torch.nn.BatchNorm2d(512), | |
| torch.nn.ReLU(), | |
| torch.nn.MaxPool2d(kernel_size=2, stride=2) | |
| ) | |
| # Fifth convolutional block with residual connection | |
| self.conv5 = torch.nn.Sequential( | |
| torch.nn.Conv2d(512, 512, kernel_size=3, padding=1), | |
| torch.nn.BatchNorm2d(512), | |
| torch.nn.ReLU() | |
| ) | |
| # Attention mechanism | |
| self.attention = torch.nn.Sequential( | |
| torch.nn.Conv2d(512, 1, kernel_size=1), | |
| torch.nn.Sigmoid() | |
| ) | |
| # Fully connected layers | |
| self.fc = torch.nn.Sequential( | |
| torch.nn.Dropout(0.5), | |
| torch.nn.Linear(512 * 3 * 3, 1024), | |
| torch.nn.ReLU(), | |
| torch.nn.Dropout(0.5), | |
| torch.nn.Linear(1024, 512), | |
| torch.nn.ReLU(), | |
| torch.nn.Dropout(0.3), | |
| torch.nn.Linear(512, num_classes) | |
| ) | |
| def forward(self, x): | |
| x = self.conv1(x) | |
| x = self.conv2(x) | |
| x = self.conv3(x) | |
| x = self.conv4(x) | |
| # Fifth conv block with residual connection | |
| x_res = x | |
| x = self.conv5(x) | |
| x = x + x_res | |
| # Apply attention | |
| attn = self.attention(x) | |
| x = x * attn | |
| # Flatten | |
| x = x.view(x.size(0), -1) | |
| # Fully connected | |
| x = self.fc(x) | |
| return x | |
| def load_emotion_model(model_path, device='cuda' if torch.cuda.is_available() else 'cpu'): | |
| """Load the emotion recognition model""" | |
| checkpoint = torch.load(model_path, map_location=device) | |
| model = EmotionCNN(num_classes=7) | |
| model.load_state_dict(checkpoint['model_state_dict']) | |
| model.to(device) | |
| model.eval() | |
| return model | |
| def preprocess_face(face_img, size=(48, 48)): | |
| """Preprocess face image for emotion detection""" | |
| transform = transforms.Compose([ | |
| transforms.Resize(size), | |
| transforms.ToTensor(), | |
| transforms.Normalize(mean=[0.5], std=[0.5]) | |
| ]) | |
| # Convert to PIL Image | |
| if isinstance(face_img, np.ndarray): | |
| face_img = Image.fromarray(cv2.cvtColor(face_img, cv2.COLOR_BGR2RGB)) | |
| # Convert to grayscale | |
| face_img = face_img.convert('L') | |
| # Apply transformations | |
| face_tensor = transform(face_img).unsqueeze(0) | |
| return face_tensor | |
| def process_video(video_path: str) -> Dict[str, Any]: | |
| """ | |
| Process a video file and return emotion detection results. | |
| Args: | |
| video_path (str): Path to the video file | |
| Returns: | |
| Dict containing: | |
| - success (bool): Whether processing was successful | |
| - message (str): Status message | |
| - results (List[Dict]): List of emotion detection results | |
| - error (str): Error message if any | |
| """ | |
| global largest_face_detections | |
| largest_face_detections = [] # Reset detections for new video | |
| # Paths - adjust these paths according to your Hugging Face Space | |
| face_cascade_path = cv2.data.haarcascades + 'haarcascade_frontalface_default.xml' | |
| emotion_model_path = "./models/best_emotion_model.pth" # Path in Hugging Face Space | |
| # Check if models exist | |
| if not os.path.exists(face_cascade_path): | |
| return { | |
| "success": False, | |
| "message": "Face cascade classifier not found", | |
| "results": [], | |
| "error": f"Error: Face cascade classifier not found at {face_cascade_path}" | |
| } | |
| if not os.path.exists(emotion_model_path): | |
| return { | |
| "success": False, | |
| "message": "Emotion model not found", | |
| "results": [], | |
| "error": f"Error: Emotion model not found at {emotion_model_path}" | |
| } | |
| # Set device | |
| device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') | |
| # Load models | |
| try: | |
| face_cascade = cv2.CascadeClassifier(face_cascade_path) | |
| emotion_model = load_emotion_model(emotion_model_path, device) | |
| except Exception as e: | |
| return { | |
| "success": False, | |
| "message": "Error loading models", | |
| "results": [], | |
| "error": str(e) | |
| } | |
| # Emotion labels | |
| emotions = ['Angry', 'Disgust', 'Fear', 'Happy', 'Sad', 'Surprise', 'Neutral'] | |
| # Open video | |
| cap = cv2.VideoCapture(video_path) | |
| if not cap.isOpened(): | |
| return { | |
| "success": False, | |
| "message": "Could not open video file", | |
| "results": [], | |
| "error": f"Error: Could not open video file at {video_path}" | |
| } | |
| frame_count = 0 | |
| total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) | |
| while True: | |
| ret, frame = cap.read() | |
| if not ret: | |
| break | |
| frame_count += 1 | |
| # Variables to track largest face | |
| largest_face_area = 0 | |
| current_detection = None | |
| # Convert frame to grayscale for face detection | |
| gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) | |
| # Detect faces using Haar Cascade | |
| faces = face_cascade.detectMultiScale( | |
| gray, | |
| scaleFactor=1.1, | |
| minNeighbors=5, | |
| minSize=(30, 30) | |
| ) | |
| # Process each detected face | |
| for (x, y, w, h) in faces: | |
| # Calculate face area | |
| face_area = w * h | |
| # Extract face region with margin | |
| margin = 20 | |
| x1 = max(0, x - margin) | |
| y1 = max(0, y - margin) | |
| x2 = min(frame.shape[1], x + w + margin) | |
| y2 = min(frame.shape[0], y + h + margin) | |
| face_img = frame[y1:y2, x1:x2] | |
| # Skip if face is too small | |
| if face_img.size == 0 or face_img.shape[0] < 20 or face_img.shape[1] < 20: | |
| continue | |
| # Convert face to PIL Image and preprocess | |
| face_tensor = preprocess_face(face_img) | |
| # Predict emotion | |
| with torch.no_grad(): | |
| face_tensor = face_tensor.to(device) | |
| output = emotion_model(face_tensor) | |
| probabilities = torch.nn.functional.softmax(output, dim=1) | |
| emotion_idx = torch.argmax(output, dim=1).item() | |
| confidence = probabilities[0][emotion_idx].item() | |
| # Get emotion label | |
| emotion = emotions[emotion_idx] | |
| # Update largest face if current face is larger | |
| if face_area > largest_face_area: | |
| largest_face_area = face_area | |
| current_detection = { | |
| 'emotion': emotion, | |
| 'confidence': confidence, | |
| 'timestamp': time.time(), | |
| 'frame_number': frame_count | |
| } | |
| # Add current detection to history if a face was detected | |
| if current_detection: | |
| largest_face_detections.append(current_detection) | |
| # Release resources | |
| cap.release() | |
| # Process results | |
| if not largest_face_detections: | |
| return { | |
| "success": True, | |
| "message": "No faces detected in video", | |
| "results": { | |
| "average_emotions": {}, | |
| "dominant_emotion": None, | |
| "detections": [], | |
| "summary": { | |
| "total_frames": total_frames, | |
| "total_detections": 0 | |
| } | |
| }, | |
| "error": None | |
| } | |
| emotion_scores = {e: [] for e in emotions} # Initialize with all emotion types | |
| for detection in largest_face_detections: | |
| emotion = detection['emotion'] | |
| confidence = detection['confidence'] | |
| emotion_scores[emotion].append(confidence) | |
| # Calculate summary statistics | |
| average_emotions = { | |
| e: sum(scores)/len(scores) if scores else 0 | |
| for e, scores in emotion_scores.items() | |
| } | |
| # Get dominant emotion based on average confidence | |
| dominant_emotion = max(average_emotions.items(), key=lambda x: x[1])[0] | |
| return { | |
| "success": True, | |
| "message": "Video processed successfully", | |
| "results": { | |
| "average_emotions": average_emotions, | |
| "dominant_emotion": dominant_emotion, | |
| # "detections": largest_face_detections, # Optional: include all detections | |
| # "summary": { | |
| # "total_frames": total_frames, | |
| # "total_detections": len(largest_face_detections), | |
| # "emotions_count": {e: len(s) for e, s in emotion_scores.items()}, | |
| # "dominant_emotion": dominant_emotion | |
| # } | |
| }, | |
| "error": None | |
| } | |
| # Gradio Interface Functions | |
| def gradio_analyze_video(video_path: str): | |
| """Wrapper function for Gradio interface""" | |
| result = process_video(video_path) | |
| if not result["success"]: | |
| return {"error": result.get("error", "Processing failed")} | |
| # Format results for better Gradio display | |
| summary = result["results"]["summary"] | |
| detections = result["results"]["detections"] | |
| # output = { | |
| # "summary": { | |
| # "total_frames": summary["total_frames"], | |
| # "faces_detected": summary["total_detections"], | |
| # "dominant_emotion": summary["dominant_emotion"], | |
| # "emotion_distribution": summary["emotions_count"] | |
| # }, | |
| # "sample_detections": detections[:5] # Show first 5 detections | |
| # } | |
| # return output | |
| output = { | |
| "average_emotions": result["results"]["average_emotions"], | |
| "dominant_emotion": result["results"]["dominant_emotion"], | |
| # "frames_analyzed": result["results"]["summary"]["total_frames"], | |
| # "faces_detected": result["results"]["summary"]["total_detections"] | |
| } | |
| return output | |
| def save_upload_file_tmp(upload_file: UploadFile) -> str: | |
| """Save uploaded file to temporary location""" | |
| try: | |
| suffix = Path(upload_file.filename).suffix | |
| with tempfile.NamedTemporaryFile(delete=False, suffix=suffix) as tmp: | |
| tmp.write(upload_file.file.read()) | |
| return tmp.name | |
| finally: | |
| upload_file.file.close() | |
| # Gradio Interface | |
| with gr.Blocks(title="Video Emotion Detection", theme=gr.themes.Soft()) as demo: | |
| gr.Markdown(""" | |
| # 🎭 Video Emotion Detection | |
| Upload a video to analyze facial emotions frame by frame | |
| """) | |
| with gr.Row(): | |
| with gr.Column(): | |
| video_input = gr.Video( | |
| label="Upload Video", | |
| sources=["upload"] # Corrected line | |
| ) | |
| submit_btn = gr.Button("Analyze Video", variant="primary") | |
| with gr.Column(): | |
| output_json = gr.JSON(label="Analysis Results") | |
| gr.Markdown(""" | |
| ### Results Interpretation | |
| - **Dominant Emotion**: Most frequently detected emotion | |
| - **Emotion Distribution**: Count of each emotion detected | |
| - **Sample Detections**: First 5 emotion detections | |
| """) | |
| submit_btn.click( | |
| fn=gradio_analyze_video, | |
| inputs=video_input, | |
| outputs=output_json, | |
| api_name="predict" | |
| ) | |
| # FastAPI Endpoints | |
| async def analyze_video(file: UploadFile = File(...)): | |
| """Original FastAPI endpoint""" | |
| try: | |
| temp_path = save_upload_file_tmp(file) | |
| result = process_video(temp_path) | |
| os.unlink(temp_path) | |
| if not result["success"]: | |
| raise HTTPException(status_code=400, detail=result.get("error", "Processing failed")) | |
| return JSONResponse(content=result) | |
| except Exception as e: | |
| if 'temp_path' in locals() and os.path.exists(temp_path): | |
| os.unlink(temp_path) | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| async def root(): | |
| """Redirect root to Gradio interface""" | |
| return """ | |
| <html> | |
| <head> | |
| <title>Video Emotion Detection</title> | |
| <meta http-equiv="refresh" content="0; url=/gradio/" /> | |
| </head> | |
| <body> | |
| <p>Redirecting to Gradio interface... <a href="/gradio">Click here</a> if not redirected.</p> | |
| </body> | |
| </html> | |
| """ | |
| # Mount Gradio app to FastAPI | |
| app = gr.mount_gradio_app(app, demo, path="/gradio") | |
| if __name__ == "__main__": | |
| import uvicorn | |
| uvicorn.run(app, host="0.0.0.0", port=7860) |