Spaces:
Sleeping
Sleeping
| import cv2 | |
| import torch | |
| import numpy as np | |
| from PIL import Image | |
| import torchvision.transforms as transforms | |
| from ultralytics import YOLO | |
| import tempfile | |
| import time | |
| import os | |
| import json | |
| import gradio as gr | |
| from fastapi import FastAPI, UploadFile, File, HTTPException | |
| import uvicorn | |
| # Initialize FastAPI | |
| app = FastAPI() | |
| # Global variable for face detections | |
| largest_face_detections = [] | |
| # Load models | |
| yolo_model_path = "yolov8n-face.pt" | |
| emotion_model_path = "best_emotion_model.pth" | |
| device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') | |
| # Check if models exist | |
| if os.path.exists(yolo_model_path): | |
| yolo_model = YOLO(yolo_model_path) | |
| else: | |
| raise FileNotFoundError(f"YOLO model not found at {yolo_model_path}") | |
| if os.path.exists(emotion_model_path): | |
| from torch import nn | |
| class EmotionCNN(nn.Module): | |
| def __init__(self, num_classes=7): | |
| super(EmotionCNN, self).__init__() | |
| self.conv1 = nn.Sequential(nn.Conv2d(1, 64, kernel_size=3, padding=1), | |
| nn.BatchNorm2d(64), | |
| nn.ReLU(), | |
| nn.MaxPool2d(kernel_size=2, stride=2)) | |
| self.fc = nn.Sequential(nn.Linear(64 * 24 * 24, 1024), | |
| nn.ReLU(), | |
| nn.Linear(1024, num_classes)) | |
| def forward(self, x): | |
| x = self.conv1(x) | |
| x = x.view(x.size(0), -1) | |
| x = self.fc(x) | |
| return x | |
| emotion_model = EmotionCNN(num_classes=7) | |
| checkpoint = torch.load(emotion_model_path, map_location=device) | |
| emotion_model.load_state_dict(checkpoint['model_state_dict']) | |
| emotion_model.to(device) | |
| emotion_model.eval() | |
| else: | |
| raise FileNotFoundError(f"Emotion model not found at {emotion_model_path}") | |
| # Emotion labels | |
| emotions = ['Angry', 'Disgust', 'Fear', 'Happy', 'Sad', 'Surprise', 'Neutral'] | |
| def preprocess_face(face_img): | |
| """Preprocess face image for emotion detection""" | |
| transform = transforms.Compose([ | |
| transforms.Resize((48, 48)), | |
| transforms.ToTensor(), | |
| transforms.Normalize(mean=[0.5], std=[0.5]) | |
| ]) | |
| face_img = Image.fromarray(cv2.cvtColor(face_img, cv2.COLOR_BGR2RGB)).convert('L') | |
| face_tensor = transform(face_img).unsqueeze(0) | |
| return face_tensor | |
| def process_video(video_path: str): | |
| """Process video and return emotion results""" | |
| global largest_face_detections | |
| largest_face_detections = [] | |
| cap = cv2.VideoCapture(video_path) | |
| if not cap.isOpened(): | |
| return {"success": False, "message": "Could not open video file"} | |
| while True: | |
| ret, frame = cap.read() | |
| if not ret: | |
| break | |
| largest_face_area = 0 | |
| current_detection = None | |
| results = yolo_model(frame, stream=True) | |
| for result in results: | |
| boxes = result.boxes | |
| for box in boxes: | |
| x1, y1, x2, y2 = map(int, box.xyxy[0].cpu().numpy()) | |
| face_img = frame[y1:y2, x1:x2] | |
| if face_img.size == 0: | |
| continue | |
| face_tensor = preprocess_face(face_img).to(device) | |
| with torch.no_grad(): | |
| output = emotion_model(face_tensor) | |
| probabilities = torch.nn.functional.softmax(output, dim=1) | |
| emotion_idx = torch.argmax(output, dim=1).item() | |
| confidence = probabilities[0][emotion_idx].item() | |
| emotion = emotions[emotion_idx] | |
| if (x2 - x1) * (y2 - y1) > largest_face_area: | |
| largest_face_area = (x2 - x1) * (y2 - y1) | |
| current_detection = {"emotion": emotion, "confidence": confidence} | |
| if current_detection: | |
| largest_face_detections.append(current_detection) | |
| cap.release() | |
| if not largest_face_detections: | |
| return {"success": True, "message": "No faces detected", "results": []} | |
| return { | |
| "success": True, | |
| "message": "Video processed", | |
| "results": largest_face_detections | |
| } | |
| async def handle_video(file: UploadFile = File(...)): | |
| """API endpoint for video emotion detection""" | |
| try: | |
| with tempfile.NamedTemporaryFile(delete=False, suffix=".mp4") as tmp: | |
| tmp.write(await file.read()) | |
| video_path = tmp.name | |
| result = process_video(video_path) | |
| os.remove(video_path) | |
| return result | |
| except Exception as e: | |
| return {"success": False, "message": "Error processing video", "error": str(e)} | |
| # Gradio UI | |
| def gradio_process(video): | |
| with tempfile.NamedTemporaryFile(delete=False, suffix=".mp4") as tmp: | |
| tmp.write(video) | |
| video_path = tmp.name | |
| result = process_video(video_path) | |
| os.remove(video_path) | |
| return result | |
| with gr.Blocks() as demo: | |
| gr.Markdown("# Video Emotion Analysis") | |
| with gr.Row(): | |
| with gr.Column(): | |
| video_input = gr.File(label="Upload a video", file_types=[".mp4"]) | |
| submit_btn = gr.Button("Analyze") | |
| with gr.Column(): | |
| output = gr.JSON(label="Results") | |
| submit_btn.click(fn=gradio_process, inputs=video_input, outputs=output) | |
| app = gr.mount_gradio_app(app, demo, path="/") | |
| if __name__ == "__main__": | |
| uvicorn.run(app, host="0.0.0.0", port=7860) | |