Spaces:
Runtime error
Runtime error
| import gradio as gr | |
| import pandas as pd | |
| import cv2 | |
| import torch | |
| import tempfile | |
| import os | |
| import librosa | |
| from fer import FER | |
| from transformers import AutoModelForAudioClassification, pipeline | |
| from moviepy.editor import VideoFileClip, AudioFileClip | |
| import numpy as np | |
| from torch.nn.functional import softmax | |
| import whisper_timestamped as whisper | |
| from translate import Translator | |
| # Load pre-trained models | |
| audio_model = AutoModelForAudioClassification.from_pretrained("3loi/SER-Odyssey-Baseline-WavLM-Categorical-Attributes", trust_remote_code=True) | |
| face_detector = FER(mtcnn=True) | |
| classifier = pipeline("text-classification", model="j-hartmann/emotion-english-distilroberta-base", top_k=None) | |
| # Set mean and std for audio model | |
| mean = audio_model.config.mean | |
| std = audio_model.config.std | |
| # Function to extract audio from video for audio emotion analysis | |
| def extract_audio_from_video(video_path): | |
| with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as temp_audio_file: | |
| video_clip = VideoFileClip(video_path) | |
| audio_clip = video_clip.audio | |
| audio_clip.write_audiofile(temp_audio_file.name, codec="pcm_s16le") | |
| return temp_audio_file.name | |
| # Function to perform audio emotion detection per second | |
| def process_audio_and_detect_emotions(audio_clip): | |
| audio_np = np.array(audio_clip) | |
| mask = torch.ones(1, len(audio_np)) | |
| wavs = torch.tensor(audio_np).unsqueeze(0) | |
| with torch.no_grad(): | |
| pred = audio_model(wavs, mask) | |
| logits = pred.logits if hasattr(pred, 'logits') else pred[0] | |
| labels = {0: 'Angry', 1: 'Sad', 2: 'Happy', 3: 'Surprise', 4: 'Fear', 5: 'Disgust', 7: 'Neutral'} | |
| probabilities = softmax(logits, dim=-1).squeeze(0)[[0, 1, 2, 3, 4, 5, 7]] | |
| probabilities = probabilities / probabilities.sum() | |
| df = pd.DataFrame([probabilities.numpy()], columns=labels.values()) | |
| return df | |
| # Function to analyze audio emotions | |
| def analyze_audio_emotions(video_path): | |
| temp_audio_path = None | |
| try: | |
| temp_audio_path = extract_audio_from_video(video_path) | |
| raw_wav, _ = librosa.load(temp_audio_path, sr=audio_model.config.sampling_rate) | |
| norm_wav = (raw_wav - mean) / (std + 0.000001) | |
| times = [] | |
| emotions_dfs = [] | |
| for start_time in range(0, len(norm_wav), audio_model.config.sampling_rate): | |
| audio_segment = norm_wav[start_time:start_time + audio_model.config.sampling_rate] | |
| df = process_audio_and_detect_emotions(audio_segment) | |
| times.append(start_time / audio_model.config.sampling_rate) | |
| emotions_dfs.append(df) | |
| emotions_df = pd.concat(emotions_dfs, ignore_index=True) | |
| emotions_df.insert(0, "Time(s)", times) | |
| emotion_rename_map = {'Angry': 'anger', 'Sad': 'sadness', 'Happy': 'happy', 'Surprise': 'surprise', 'Fear': 'fear', 'Disgust': 'disgust', 'Neutral': 'neutral'} | |
| emotions_df.rename(columns=emotion_rename_map, inplace=True) | |
| emotions_xlsx_path = tempfile.NamedTemporaryFile(suffix=".xlsx", delete=False).name | |
| emotions_df.to_excel(emotions_xlsx_path, index=False) | |
| return f"Audio emotion detection completed successfully.", emotions_df, emotions_xlsx_path | |
| except Exception as e: | |
| return f"Error during audio emotion detection: {str(e)}", None, None | |
| finally: | |
| if temp_audio_path and os.path.exists(temp_audio_path): | |
| os.remove(temp_audio_path) | |
| # Function to detect facial emotions | |
| def detect_faces_and_emotions(video_path): | |
| temp_video_path = None | |
| temp_audio_path = None | |
| output_video_path = None | |
| emotions_data = [] | |
| try: | |
| temp_video = tempfile.NamedTemporaryFile(suffix=".mp4", delete=False) | |
| temp_video_path = temp_video.name | |
| temp_audio = tempfile.NamedTemporaryFile(suffix=".mp3", delete=False) | |
| temp_audio_path = temp_audio.name | |
| output_xlsx = tempfile.NamedTemporaryFile(suffix=".xlsx", delete=False) | |
| output_xlsx_path = output_xlsx.name | |
| original_video = VideoFileClip(video_path) | |
| original_audio = original_video.audio | |
| original_audio.write_audiofile(temp_audio_path) | |
| cap = cv2.VideoCapture(video_path) | |
| if not cap.isOpened(): | |
| raise Exception("Error: Could not open video file.") | |
| fps = int(cap.get(cv2.CAP_PROP_FPS)) | |
| frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) | |
| frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) | |
| fourcc = cv2.VideoWriter_fourcc(*'mp4v') | |
| out = cv2.VideoWriter(temp_video_path, fourcc, fps, (frame_width, frame_height)) | |
| frame_number = 0 | |
| while cap.isOpened(): | |
| ret, frame = cap.read() | |
| if not ret: | |
| break | |
| if frame is None: | |
| continue | |
| time_seconds = round(frame_number / fps) | |
| result = face_detector.detect_emotions(frame) | |
| for face in result: | |
| bounding_box = face["box"] | |
| emotions = face["emotions"] | |
| emotions["Time(s)"] = time_seconds | |
| emotions_data.append(emotions) | |
| cv2.rectangle(frame, (bounding_box[0], bounding_box[1]), | |
| (bounding_box[0] + bounding_box[2], bounding_box[1] + bounding_box[3]), (0, 155, 255), 2) | |
| for index, (emotion_name, score) in enumerate(emotions.items()): | |
| color = (211, 211, 211) if score < 0.01 else (255, 0, 0) | |
| emotion_score = "{}: {:.2f}".format(emotion_name, score) | |
| cv2.putText(frame, emotion_score, (bounding_box[0], bounding_box[1] + bounding_box[3] + 30 + index * 15), | |
| cv2.FONT_HERSHEY_SIMPLEX, 0.5, color, 1, cv2.LINE_AA) | |
| out.write(frame) | |
| frame_number += 1 | |
| cap.release() | |
| out.release() | |
| emotions_df = pd.DataFrame(emotions_data) | |
| emotions_df['Time(s)'] = emotions_df['Time(s)'].round().astype(int) | |
| max_time = emotions_df['Time(s)'].max() | |
| all_times = pd.DataFrame({'Time(s)': range(max_time + 1)}) | |
| avg_scores = emotions_df.groupby("Time(s)").mean().reset_index() | |
| df_merged = pd.merge(all_times, avg_scores, on='Time(s)', how='left') | |
| df_merged.fillna(0, inplace=True) | |
| df_merged['Time(s)'] = df_merged['Time(s)'].astype(str) + " sec" | |
| df_merged.to_excel(output_xlsx_path, index=False) | |
| processed_video = VideoFileClip(temp_video_path) | |
| audio = AudioFileClip(temp_audio_path) | |
| final_video = processed_video.set_audio(audio) | |
| output_video = tempfile.NamedTemporaryFile(suffix=".mp4", delete=False) | |
| output_video_path = output_video.name | |
| final_video.write_videofile(output_video_path, codec='libx264') | |
| return "Face and emotion detection completed successfully.", df_merged, output_xlsx_path, output_video_path | |
| except Exception as e: | |
| return f"Error during processing: {str(e)}", None, None, None | |
| finally: | |
| if temp_video_path and os.path.exists(temp_video_path): | |
| os.remove(temp_video_path) | |
| if temp_audio_path and os.path.exists(temp_audio_path): | |
| os.remove(temp_audio_path) | |
| # Function to analyze text emotions | |
| def process_video_text(video_path): | |
| temp_audio_path = None | |
| try: | |
| video_clip = VideoFileClip(video_path) | |
| with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as temp_audio_file: | |
| temp_audio_path = temp_audio_file.name | |
| video_clip.audio.write_audiofile(temp_audio_path) | |
| audio = whisper.load_audio(temp_audio_path) | |
| model = whisper.load_model("medium", device="cpu") | |
| result = whisper.transcribe(model, audio) | |
| # Create lists to store word-level data with timestamps | |
| word_texts = [] | |
| word_starts = [] | |
| word_ends = [] | |
| word_confidences = [] | |
| for segment in result['segments']: | |
| for word in segment['words']: | |
| word_texts.append(word['text']) | |
| word_starts.append(word['start']) | |
| word_ends.append(word['end']) | |
| word_confidences.append(word['confidence']) | |
| # Create segments DataFrame | |
| segments_data = [{'text': seg['text'], 'start': seg['start'], 'end': seg['end'], 'confidence': seg['confidence']} for seg in result['segments']] | |
| segments_df = pd.DataFrame(segments_data) | |
| # Translate from Korean to English | |
| translator = Translator(from_lang='ko', to_lang='en') | |
| segments_df['Translated_Text'] = segments_df['text'].apply(lambda x: translator.translate(x)) | |
| # Apply the sentiment analysis model to the translated text | |
| segments_df['Sentiment_Scores'] = segments_df['Translated_Text'].apply(lambda x: {entry['label']: entry['score'] for entry in classifier(x)[0]}) | |
| # Split the sentiment scores into individual columns | |
| sentiment_df = segments_df['Sentiment_Scores'].apply(pd.Series) | |
| sentiment_df = pd.concat([segments_df, sentiment_df], axis=1) | |
| # Create words DataFrame | |
| words_data = { | |
| 'text': word_texts, | |
| 'start': word_starts, | |
| 'end': word_ends, | |
| 'confidence': word_confidences | |
| } | |
| words_df = pd.DataFrame(words_data) | |
| # Round up the start time to the next second | |
| words_df['second'] = words_df['start'].apply(lambda x: int(np.ceil(x))) | |
| # Group words by second, concatenating words that belong to the same second | |
| words_grouped = words_df.groupby('second').agg({ | |
| 'text': lambda x: ' '.join(x), | |
| 'start': 'min', | |
| 'end': 'max', | |
| 'confidence': 'mean' | |
| }).reset_index() | |
| # Fill in missing seconds | |
| max_second = int(video_clip.duration) # The last second in the video | |
| all_seconds = pd.DataFrame({'second': np.arange(0, max_second + 1)}) # Start from 0 and go to the maximum second | |
| words_grouped = all_seconds.merge(words_grouped, on='second', how='left') | |
| # Fill missing values with blanks or zeros | |
| words_grouped['text'].fillna('', inplace=True) | |
| words_grouped.fillna(0, inplace=True) | |
| # Initialize emotion columns with NaN values | |
| emotion_columns = sentiment_df.columns.difference(['text', 'start', 'end', 'confidence', 'Translated_Text', 'Sentiment_Scores']) | |
| for col in emotion_columns: | |
| words_grouped[col] = np.nan | |
| # For each second, find the corresponding segment and copy its emotion scores | |
| for i, row in words_grouped.iterrows(): | |
| matching_segment = sentiment_df[(sentiment_df['start'] <= row['start']) & (sentiment_df['end'] >= row['end'])] | |
| if not matching_segment.empty: | |
| for emotion in emotion_columns: | |
| words_grouped.at[i, emotion] = matching_segment.iloc[0][emotion] | |
| # Replace any NaN values in emotion columns with 0 | |
| words_grouped[emotion_columns] = words_grouped[emotion_columns].fillna(0) | |
| # Save DataFrames to XLSX files | |
| segments_xlsx_path = tempfile.NamedTemporaryFile(suffix=".xlsx", delete=False).name | |
| words_xlsx_path = tempfile.NamedTemporaryFile(suffix=".xlsx", delete=False).name | |
| sentiment_df.to_excel(segments_xlsx_path, index=False) | |
| words_grouped.to_excel(words_xlsx_path, index=False) | |
| return words_grouped, sentiment_df, words_xlsx_path, segments_xlsx_path, "Text emotion processing completed successfully!" | |
| except Exception as e: | |
| return None, None, None, None, f"Error during text emotion processing: {str(e)}" | |
| finally: | |
| if temp_audio_path and os.path.exists(temp_audio_path): | |
| os.remove(temp_audio_path) | |
| # Gradio App | |
| def gradio_app(): | |
| interface = gr.Blocks() | |
| with interface: | |
| gr.Markdown("## I-MEQ: Emotion Monitoring System") | |
| video_input = gr.Video(label="Upload your video for analysis", height=600) | |
| with gr.Row(): | |
| analyze_audio_button = gr.Button("Analyze Audio Emotions") | |
| analyze_fer_button = gr.Button("Analyze Facial Emotions") | |
| analyze_text_button = gr.Button("Transcribe & Analyze Textual Emotions") | |
| with gr.Row(): | |
| with gr.Column(): | |
| audio_analysis_status = gr.Textbox(label="Audio Emotion Analysis Status") | |
| audio_emotions_dataframe = gr.Dataframe(label="Audio Emotions DataFrame", interactive=False) | |
| audio_emotions_xlsx_download = gr.File(label="Download Audio Emotions XLSX") | |
| with gr.Column(): | |
| fer_analysis_status = gr.Textbox(label="Facial Emotion Analysis Status") | |
| fer_emotions_dataframe = gr.Dataframe(label="Facial Emotions DataFrame", interactive=False) | |
| fer_emotions_xlsx_download = gr.File(label="Download Facial Emotions XLSX") | |
| processed_video_download = gr.File(label="Download Processed Video") | |
| with gr.Column(): | |
| text_analysis_status = gr.Textbox(label="Text Sentiment Analysis Status") | |
| words_dataframe = gr.Dataframe(label="Words DataFrame", interactive=False) | |
| segments_dataframe = gr.Dataframe(label="Segments DataFrame", interactive=False) | |
| words_xlsx_download = gr.File(label="Download Words XLSX") | |
| segments_xlsx_download = gr.File(label="Download Segments XLSX") | |
| analyze_audio_button.click( | |
| analyze_audio_emotions, | |
| inputs=video_input, | |
| outputs=[ | |
| audio_analysis_status, | |
| audio_emotions_dataframe, | |
| audio_emotions_xlsx_download | |
| ] | |
| ) | |
| analyze_fer_button.click( | |
| detect_faces_and_emotions, | |
| inputs=video_input, | |
| outputs=[ | |
| fer_analysis_status, | |
| fer_emotions_dataframe, | |
| fer_emotions_xlsx_download, | |
| processed_video_download | |
| ] | |
| ) | |
| analyze_text_button.click( | |
| process_video_text, | |
| inputs=video_input, | |
| outputs=[ | |
| words_dataframe, | |
| segments_dataframe, | |
| words_xlsx_download, | |
| segments_xlsx_download, | |
| text_analysis_status | |
| ] | |
| ) | |
| interface.launch() | |
| # Start the Gradio app | |
| gradio_app() |