Spaces:
Sleeping
Sleeping
| import os | |
| import cv2 | |
| import torch | |
| import numpy as np | |
| from PIL import Image | |
| import torchvision.transforms as transforms | |
| import time | |
| import json | |
| from typing import Dict, Any | |
| from fastapi import FastAPI, HTTPException, File, UploadFile | |
| from pydantic import BaseModel | |
| import gradio as gr | |
| import tempfile | |
| app = FastAPI() | |
| # Global variable to store the history of largest face detections | |
| largest_face_detections = [] | |
| # EmotionCNN model definition (same as in your original code) | |
| class EmotionCNN(torch.nn.Module): | |
| def __init__(self, num_classes=7): | |
| super(EmotionCNN, self).__init__() | |
| # Your convolutional layers and other definitions | |
| # ... | |
| def forward(self, x): | |
| # Forward method as in your code | |
| pass | |
| # Load emotion model | |
| def load_emotion_model(model_path, device='cuda' if torch.cuda.is_available() else 'cpu'): | |
| checkpoint = torch.load(model_path, map_location=device) | |
| model = EmotionCNN(num_classes=7) | |
| model.load_state_dict(checkpoint['model_state_dict']) | |
| model.to(device) | |
| model.eval() | |
| return model | |
| # Process the uploaded video (either MP4 or WebM) | |
| async def process_video(video_file: UploadFile) -> Dict[str, Any]: | |
| global largest_face_detections | |
| largest_face_detections = [] # Reset detections for new video | |
| # Path to models and other setup | |
| face_cascade_path = cv2.data.haarcascades + 'haarcascade_frontalface_default.xml' | |
| emotion_model_path = "best_emotion_model.pth" | |
| if not os.path.exists(face_cascade_path): | |
| raise HTTPException(status_code=400, detail="Face cascade classifier not found") | |
| if not os.path.exists(emotion_model_path): | |
| raise HTTPException(status_code=400, detail="Emotion model not found") | |
| device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') | |
| try: | |
| face_cascade = cv2.CascadeClassifier(face_cascade_path) | |
| emotion_model = load_emotion_model(emotion_model_path, device) | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=f"Error loading models: {str(e)}") | |
| emotions = ['Angry', 'Disgust', 'Fear', 'Happy', 'Sad', 'Surprise', 'Neutral'] | |
| # Save the uploaded video file to a temporary directory without using shutil | |
| temp_dir = tempfile.mkdtemp() | |
| video_path = os.path.join(temp_dir, "uploaded_video") | |
| # Open the video file stream and save it as a local file | |
| with open(video_path, "wb") as f: | |
| f.write(await video_file.read()) | |
| cap = cv2.VideoCapture(video_path) | |
| if not cap.isOpened(): | |
| raise HTTPException(status_code=400, detail=f"Could not open video file at {video_path}") | |
| frame_count = 0 | |
| total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) | |
| while True: | |
| ret, frame = cap.read() | |
| if not ret: | |
| break | |
| frame_count += 1 | |
| largest_face_area = 0 | |
| current_detection = None | |
| gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) | |
| faces = face_cascade.detectMultiScale(gray, scaleFactor=1.1, minNeighbors=5, minSize=(30, 30)) | |
| for (x, y, w, h) in faces: | |
| face_area = w * h | |
| margin = 20 | |
| x1 = max(0, x - margin) | |
| y1 = max(0, y - margin) | |
| x2 = min(frame.shape[1], x + w + margin) | |
| y2 = min(frame.shape[0], y + h + margin) | |
| face_img = frame[y1:y2, x1:x2] | |
| if face_img.size == 0 or face_img.shape[0] < 20 or face_img.shape[1] < 20: | |
| continue | |
| face_tensor = preprocess_face(face_img) | |
| with torch.no_grad(): | |
| face_tensor = face_tensor.to(device) | |
| output = emotion_model(face_tensor) | |
| probabilities = torch.nn.functional.softmax(output, dim=1) | |
| emotion_idx = torch.argmax(output, dim=1).item() | |
| confidence = probabilities[0][emotion_idx].item() | |
| emotion = emotions[emotion_idx] | |
| if face_area > largest_face_area: | |
| largest_face_area = face_area | |
| current_detection = { | |
| 'emotion': emotion, | |
| 'confidence': confidence, | |
| 'timestamp': time.time(), | |
| 'frame_number': frame_count | |
| } | |
| if current_detection: | |
| largest_face_detections.append(current_detection) | |
| cap.release() | |
| if not largest_face_detections: | |
| return { | |
| "success": True, | |
| "message": "No faces detected in video", | |
| "results": [], | |
| "error": None | |
| } | |
| emotions_count = {} | |
| for detection in largest_face_detections: | |
| emotion = detection['emotion'] | |
| emotions_count[emotion] = emotions_count.get(emotion, 0) + 1 | |
| dominant_emotion = max(emotions_count.items(), key=lambda x: x[1])[0] | |
| return { | |
| "success": True, | |
| "message": "Video processed successfully", | |
| "results": { | |
| "detections": largest_face_detections, | |
| "summary": { | |
| "total_frames": total_frames, | |
| "total_detections": len(largest_face_detections), | |
| "emotions_count": emotions_count, | |
| "dominant_emotion": dominant_emotion | |
| } | |
| }, | |
| "error": None | |
| } | |
| class VideoRequest(BaseModel): | |
| path: str | |
| # FastAPI endpoint for processing the video file | |
| async def process_video_request(file: UploadFile = File(...)): | |
| try: | |
| results = await process_video(file) | |
| return results | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| # Gradio interface | |
| def gradio_interface(): | |
| def process_gradio_video(video_file): | |
| # This function now accepts WebM files and other video formats. | |
| return process_video(video_file) | |
| # Remove the `type` argument from `gr.Video()` | |
| interface = gr.Interface( | |
| fn=process_gradio_video, | |
| inputs=gr.Video(), # This will automatically handle file uploads | |
| outputs="json" | |
| ) | |
| return interface | |
| # Launch Gradio Interface on FastAPI | |
| gradio_interface().launch(server_name="0.0.0.0", server_port=7860, share=True) | |