Spaces:
Runtime error
Runtime error
| import os | |
| import subprocess | |
| import gradio as gr | |
| import whisper | |
| import yt_dlp | |
| import torch | |
| import numpy as np | |
| from moviepy.editor import VideoFileClip | |
| from transformers import AutoModelForAudioClassification, AutoFeatureExtractor | |
| from transformers import AutoTokenizer, AutoModelForSequenceClassification | |
| from transformers import BlipProcessor, BlipForConditionalGeneration | |
| import cv2 | |
| emotion_labels = ['anger', 'joy', 'optimism', 'sad'] | |
| def extract_audio_from_video(video_path): | |
| video_clip = VideoFileClip(video_path) | |
| audio_output = os.path.join('./', 'audio.mp3') | |
| audio_clip = video_clip.audio | |
| audio_clip.write_audiofile(audio_output) | |
| return audio_output | |
| def convert_mp3_to_wav(mp3_path): | |
| from pydub import AudioSegment | |
| audio = AudioSegment.from_mp3(mp3_path) | |
| wav_output = os.path.join('./', 'audio.wav') | |
| audio.export(wav_output, format="wav") | |
| return wav_output | |
| def process_text(text): | |
| model_name = "cardiffnlp/twitter-roberta-base-emotion" | |
| emotion_labels = ['anger', 'joy', 'optimism', 'sad'] | |
| tokenizer = AutoTokenizer.from_pretrained(model_name) | |
| model = AutoModelForSequenceClassification.from_pretrained(model_name) | |
| inputs = tokenizer(text, return_tensors="pt", truncation=True, max_length=512) | |
| with torch.no_grad(): | |
| outputs = model(**inputs) | |
| logits = outputs.logits | |
| emotion_probs = torch.softmax(logits, dim=-1).squeeze() | |
| predicted_emotion = emotion_labels[torch.argmax(emotion_probs)] | |
| emotion_dict = {emotion_labels[i]: emotion_probs[i].item() for i in range(len(emotion_labels))} | |
| return emotion_dict, predicted_emotion | |
| def preprocess_frame(frame): | |
| frame = cv2.resize(frame, (112, 112)) | |
| pixel_values = caption_processor(images=frame, return_tensors="pt").pixel_values | |
| return pixel_values | |
| def generate_caption(pixel_values): | |
| caption_ids = caption_model.generate(pixel_values) | |
| caption = caption_processor.batch_decode(caption_ids, skip_special_tokens=True)[0] | |
| return caption | |
| def predict_emotions(caption): | |
| inputs = emotion_tokenizer(caption, return_tensors='pt', truncation=True, padding=True) | |
| outputs = emotion_model(**inputs) | |
| emotion_probs = torch.softmax(outputs.logits, dim=1) | |
| predicted_emotions = {label: prob.item() for label, prob in zip(emotion_labels, emotion_probs[0])} | |
| return predicted_emotions | |
| # Models for image captioning and emotion analysis | |
| caption_model_name = "Salesforce/blip-image-captioning-base" | |
| caption_processor = BlipProcessor.from_pretrained(caption_model_name) | |
| caption_model = BlipForConditionalGeneration.from_pretrained(caption_model_name) | |
| emotion_model_name = "j-hartmann/emotion-english-distilroberta-base" | |
| emotion_tokenizer = AutoTokenizer.from_pretrained(emotion_model_name) | |
| emotion_model = AutoModelForSequenceClassification.from_pretrained(emotion_model_name) | |
| def analyze_video(video=None, video_url=None): | |
| if video is not None: | |
| # If a video is uploaded, process the uploaded file | |
| video_path = video | |
| elif video_url: | |
| # For streaming YouTube video, just embed the link (assuming it's embedded using Gradio) | |
| video_path = None | |
| # If the video is uploaded, extract audio | |
| if video_path: | |
| audio_path = extract_audio_from_video(video_path) | |
| audio_wav_path = convert_mp3_to_wav(audio_path) | |
| model_whisper = whisper.load_model("base") | |
| result_whisper = model_whisper.transcribe(audio_wav_path) | |
| transcript = result_whisper['text'] | |
| emotion_dict_text, predicted_emotion_text = process_text(transcript) | |
| # Frame-wise emotion detection from the video | |
| n_frame_interval = 120 | |
| emotion_vectors_video = [] | |
| video_capture = cv2.VideoCapture(video_path) | |
| total_frames_video = int(video_capture.get(cv2.CAP_PROP_FRAME_COUNT)) | |
| frame_count_video = 0 | |
| while video_capture.isOpened(): | |
| ret_video, frame_video = video_capture.read() | |
| if not ret_video or frame_count_video > total_frames_video: | |
| break | |
| if frame_count_video % n_frame_interval == 0: | |
| pixel_values_video = preprocess_frame(frame_video) | |
| caption_video = generate_caption(pixel_values_video) | |
| predicted_emotions_video = predict_emotions(caption_video) | |
| emotion_vectors_video.append(np.array(list(predicted_emotions_video.values()))) | |
| frame_count_video += 1 | |
| video_capture.release() | |
| average_emotion_vector_video = np.mean(emotion_vectors_video, axis=0) | |
| combined_emotion_vector_final = np.concatenate((np.array(list(emotion_dict_text.values())), average_emotion_vector_video)) | |
| final_most_predicted_index = np.argmax(combined_emotion_vector_final) | |
| final_most_predicted_emotion = list(emotion_dict_text.keys())[final_most_predicted_index] | |
| return transcript, predicted_emotion_text, final_most_predicted_emotion | |
| else: | |
| # For streaming, return an empty analysis or handle the embedding in the Gradio UI | |
| return None, "Streaming video detected (no processing).", "N/A" | |
| # Gradio Interface | |
| with gr.Blocks() as iface: | |
| gr.Markdown("# 🎥 Multimodal Emotion Recognition\nUpload a video or input a YouTube video URL to analyze emotions from audio and video frames.") | |
| with gr.Tabs(): | |
| with gr.TabItem("Upload Video"): | |
| video_file = gr.File(label="Upload Video File", file_types=["video"]) | |
| submit_button_file = gr.Button("Analyze Uploaded Video") | |
| with gr.TabItem("YouTube URL"): | |
| video_url = gr.Textbox(label="YouTube Video URL", placeholder="Enter YouTube video URL") | |
| submit_button_url = gr.Button("Analyze YouTube Video") | |
| with gr.Row(): | |
| transcript_output = gr.Textbox(label="Transcript", interactive=False) | |
| audio_emotion_output = gr.Textbox(label="Emotion from Audio and Text", interactive=False) | |
| visual_emotion_output = gr.Textbox(label="Emotion from Video", interactive=False) | |
| # For uploaded video | |
| submit_button_file.click(analyze_video, inputs=video_file, outputs=[transcript_output, audio_emotion_output, visual_emotion_output]) | |
| # For YouTube streaming (no downloading) | |
| submit_button_url.click(analyze_video, inputs=video_url, outputs=[transcript_output, audio_emotion_output, visual_emotion_output]) | |
| if __name__ == "__main__": | |
| iface.launch() | |