Spaces:
Sleeping
Sleeping
| import gradio as gr | |
| import pyvista as pv | |
| from pyvista import examples | |
| import numpy as np | |
| import librosa | |
| import requests | |
| from io import BytesIO | |
| from PIL import Image | |
| import os | |
| from tensorflow.keras.models import load_model | |
| from faster_whisper import WhisperModel | |
| import random | |
| from textblob import TextBlob | |
| import torch | |
| import scipy.io.wavfile | |
| from transformers import AutoProcessor, MusicgenForConditionalGeneration | |
| import tempfile | |
| import base64 | |
| import plotly.graph_objects as go | |
| from plotly.subplots import make_subplots | |
| import soundfile as sf | |
| from pydub import AudioSegment | |
| import math | |
| import json | |
| # Load the emotion prediction model | |
| def load_emotion_model(model_path): | |
| try: | |
| model = load_model(model_path) | |
| print("Emotion model loaded successfully") | |
| return model | |
| except Exception as e: | |
| print("Error loading emotion prediction model:", e) | |
| return None | |
| model_path = 'mymodel_SER_LSTM_RAVDESS.h5' | |
| model = load_emotion_model(model_path) | |
| # Initialize WhisperModel | |
| model_size = "small" | |
| model2 = WhisperModel(model_size, device="cpu", compute_type="int8") | |
| # Load MusicGen model | |
| def load_musicgen_model(): | |
| try: | |
| device = "cuda" if torch.cuda.is_available() else "cpu" | |
| processor = AutoProcessor.from_pretrained("facebook/musicgen-small") | |
| music_model = MusicgenForConditionalGeneration.from_pretrained("facebook/musicgen-small") | |
| music_model.to(device) | |
| print("MusicGen model loaded successfully") | |
| return processor, music_model, device | |
| except Exception as e: | |
| print("Error loading MusicGen model:", e) | |
| return None, None, None | |
| processor, music_model, device = load_musicgen_model() | |
| # Function to chunk audio into segments | |
| def chunk_audio(audio_path, chunk_duration=10): | |
| """Split audio into chunks and return list of chunk file paths""" | |
| try: | |
| # Load audio file | |
| audio = AudioSegment.from_file(audio_path) | |
| duration_ms = len(audio) | |
| chunk_ms = chunk_duration * 1000 | |
| # Validate chunk duration | |
| if chunk_duration <= 0: | |
| raise ValueError("Chunk duration must be positive") | |
| if chunk_duration > duration_ms / 1000: | |
| # If chunk duration is longer than audio, return the whole audio | |
| return [audio_path], 1 | |
| chunks = [] | |
| chunk_files = [] | |
| # Calculate number of chunks | |
| num_chunks = math.ceil(duration_ms / chunk_ms) | |
| for i in range(num_chunks): | |
| start_ms = i * chunk_ms | |
| end_ms = min((i + 1) * chunk_ms, duration_ms) | |
| # Extract chunk | |
| chunk = audio[start_ms:end_ms] | |
| chunks.append(chunk) | |
| # Save chunk to temporary file | |
| with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as tmp_file: | |
| chunk.export(tmp_file.name, format="wav") | |
| chunk_files.append(tmp_file.name) | |
| return chunk_files, num_chunks | |
| except Exception as e: | |
| print("Error chunking audio:", e) | |
| # Return original file as single chunk if chunking fails | |
| return [audio_path], 1 | |
| # Function to transcribe audio | |
| def transcribe(wav_filepath): | |
| try: | |
| segments, _ = model2.transcribe(wav_filepath, beam_size=5) | |
| return "".join([segment.text for segment in segments]) | |
| except Exception as e: | |
| print("Error transcribing audio:", e) | |
| return "Transcription failed" | |
| # Function to extract MFCC features from audio | |
| def extract_mfcc(wav_file_name): | |
| try: | |
| y, sr = librosa.load(wav_file_name) | |
| mfccs = np.mean(librosa.feature.mfcc(y=y, sr=sr, n_mfcc=40).T, axis=0) | |
| return mfccs | |
| except Exception as e: | |
| print("Error extracting MFCC features:", e) | |
| return None | |
| # Emotions dictionary | |
| emotions = {0: 'neutral', 1: 'calm', 2: 'happy', 3: 'sad', 4: 'angry', 5: 'fearful', 6: 'disgust', 7: 'surprised'} | |
| # Function to predict emotion from audio | |
| def predict_emotion_from_audio(wav_filepath): | |
| try: | |
| if model is None: | |
| return "Model not loaded" | |
| test_point = extract_mfcc(wav_filepath) | |
| if test_point is not None: | |
| test_point = np.reshape(test_point, newshape=(1, 40, 1)) | |
| predictions = model.predict(test_point) | |
| predicted_emotion_label = np.argmax(predictions[0]) | |
| return emotions.get(predicted_emotion_label, "Unknown emotion") | |
| else: | |
| return "Error: Unable to extract features" | |
| except Exception as e: | |
| print("Error predicting emotion:", e) | |
| return "Prediction error" | |
| # Function to analyze sentiment from text | |
| def analyze_sentiment(text): | |
| try: | |
| if not text or text.strip() == "": | |
| return "neutral", 0.0 | |
| analysis = TextBlob(text) | |
| polarity = analysis.sentiment.polarity | |
| if polarity > 0.1: | |
| sentiment = "positive" | |
| elif polarity < -0.1: | |
| sentiment = "negative" | |
| else: | |
| sentiment = "neutral" | |
| return sentiment, polarity | |
| except Exception as e: | |
| print("Error analyzing sentiment:", e) | |
| return "neutral", 0.0 | |
| # Function to get image prompt based on sentiment | |
| def get_image_prompt(sentiment, transcribed_text, chunk_idx, total_chunks): | |
| base_prompt = f"Chunk {chunk_idx+1}/{total_chunks}: " | |
| if sentiment == "positive": | |
| return base_prompt + f"Generate a vibrant, uplifting equirectangular 360 image texture with bright colors, joyful atmosphere, and optimistic vibes representing: [{transcribed_text}]. The scene should evoke happiness and positivity." | |
| elif sentiment == "negative": | |
| return base_prompt + f"Generate a moody, dramatic equirectangular 360 image texture with dark tones, intense atmosphere, and emotional depth representing: [{transcribed_text}]. The scene should convey melancholy and intensity." | |
| else: # neutral | |
| return base_prompt + f"Generate a balanced, serene equirectangular 360 image texture with harmonious colors, peaceful atmosphere, and calm vibes representing: [{transcribed_text}]. The scene should evoke tranquility and balance." | |
| # Function to get music prompt based on emotion | |
| def get_music_prompt(emotion, transcribed_text, chunk_idx, total_chunks): | |
| base_prompt = f"Chunk {chunk_idx+1}/{total_chunks}: " | |
| emotion_prompts = { | |
| 'neutral': f"Create ambient, background music with neutral tones, subtle melodies, and unobtrusive atmosphere that complements: {transcribed_text}. The music should be calm and balanced.", | |
| 'calm': f"Generate soothing, peaceful music with gentle melodies, soft instrumentation, and relaxing vibes that represents: {transcribed_text}. The music should evoke tranquility and serenity.", | |
| 'happy': f"Create joyful, upbeat music with cheerful melodies, bright instrumentation, and energetic rhythms that celebrates: {transcribed_text}. The music should evoke happiness and positivity.", | |
| 'sad': f"Generate emotional, melancholic music with poignant melodies, soft strings, and heartfelt atmosphere that reflects: {transcribed_text}. The music should evoke sadness and reflection.", | |
| 'angry': f"Create intense, powerful music with driving rhythms, aggressive instrumentation, and strong dynamics that expresses: {transcribed_text}. The music should evoke anger and intensity.", | |
| 'fearful': f"Generate suspenseful, tense music with eerie melodies, atmospheric sounds, and unsettling vibes that represents: {transcribed_text}. The music should evoke fear and anticipation.", | |
| 'disgust': f"Create dark, unsettling music with dissonant harmonies, unusual sounds, and uncomfortable atmosphere that reflects: {transcribed_text}. The music should evoke discomfort and unease.", | |
| 'surprised': f"Generate dynamic, unexpected music with sudden changes, playful melodies, and surprising elements that represents: {transcribed_text}. The music should evoke surprise and wonder." | |
| } | |
| return base_prompt + emotion_prompts.get(emotion.lower(), | |
| f"Create background music with {emotion} atmosphere that represents: {transcribed_text}") | |
| # Function to generate music with MusicGen (using acoustic emotion prediction) | |
| def generate_music(transcribed_text, emotion_prediction, chunk_idx, total_chunks): | |
| try: | |
| if processor is None or music_model is None: | |
| return None | |
| # Get specific prompt based on emotion | |
| prompt = get_music_prompt(emotion_prediction, transcribed_text, chunk_idx, total_chunks) | |
| # Limit prompt length to avoid model issues | |
| if len(prompt) > 200: | |
| prompt = prompt[:200] + "..." | |
| inputs = processor( | |
| text=[prompt], | |
| padding=True, | |
| return_tensors="pt", | |
| ).to(device) | |
| # Generate audio | |
| audio_values = music_model.generate(**inputs, max_new_tokens=512) | |
| # Convert to numpy array and sample rate | |
| sampling_rate = music_model.config.audio_encoder.sampling_rate | |
| audio_data = audio_values[0, 0].cpu().numpy() | |
| # Normalize audio data | |
| audio_data = audio_data / np.max(np.abs(audio_data)) | |
| # Create a temporary file to save the audio | |
| with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as tmp_file: | |
| scipy.io.wavfile.write(tmp_file.name, rate=sampling_rate, data=audio_data) | |
| return tmp_file.name | |
| except Exception as e: | |
| print("Error generating music:", e) | |
| return None | |
| # --- DeepAI Image Generation (Text2Img) --- | |
| api_key = os.getenv("DeepAI_api_key") | |
| def generate_image(sentiment_prediction, transcribed_text, chunk_idx, total_chunks): | |
| try: | |
| if not api_key: | |
| # fallback white image if no API key | |
| return Image.new('RGB', (1024, 512), color='white') | |
| # Get specific prompt based on sentiment | |
| prompt = get_image_prompt(sentiment_prediction, transcribed_text, chunk_idx, total_chunks) | |
| # Make request to DeepAI text2img API | |
| response = requests.post( | |
| "https://api.deepai.org/api/text2img", | |
| data={ | |
| 'text': prompt, | |
| 'width': 1024, | |
| 'height': 512, | |
| 'image_generator_version': 'hd' | |
| }, | |
| headers={'api-key': api_key} | |
| ) | |
| data = response.json() | |
| if 'output_url' in data: | |
| # Download the generated image | |
| img_resp = requests.get(data['output_url']) | |
| return Image.open(BytesIO(img_resp.content)) | |
| else: | |
| print("Error in DeepAI response:", data) | |
| # Return a fallback image | |
| return Image.new('RGB', (1024, 512), color='white') | |
| except Exception as e: | |
| print("Error generating image:", e) | |
| # Return a fallback image | |
| return Image.new('RGB', (1024, 512), color='white') | |
| # Function to process a single chunk | |
| def process_chunk(chunk_path, chunk_idx, total_chunks, generate_audio=True): | |
| try: | |
| # Get acoustic emotion prediction (for music) | |
| emotion_prediction = predict_emotion_from_audio(chunk_path) | |
| # Get transcribed text | |
| transcribed_text = transcribe(chunk_path) | |
| # Analyze sentiment of transcribed text (for image) | |
| sentiment, polarity = analyze_sentiment(transcribed_text) | |
| # Generate image using SENTIMENT analysis with specific prompt | |
| image = generate_image(sentiment, transcribed_text, chunk_idx, total_chunks) | |
| # Generate music only if audio generation is enabled | |
| music_path = None | |
| if generate_audio: | |
| music_path = generate_music(transcribed_text, emotion_prediction, chunk_idx, total_chunks) | |
| return { | |
| 'chunk_index': chunk_idx + 1, | |
| 'emotion': emotion_prediction, | |
| 'transcription': transcribed_text, | |
| 'sentiment': sentiment, | |
| 'image': image, | |
| 'music': music_path | |
| } | |
| except Exception as e: | |
| print(f"Error processing chunk {chunk_idx + 1}:", e) | |
| # Return a fallback result with all required keys | |
| return { | |
| 'chunk_index': chunk_idx + 1, | |
| 'emotion': "Error", | |
| 'transcription': "Transcription failed", | |
| 'sentiment': "Sentiment: error", | |
| 'image': Image.new('RGB', (1024, 512), color='white'), | |
| 'music': None | |
| } | |
| # Function to get predictions for all chunks | |
| def get_predictions(audio_input, generate_audio=True, chunk_duration=10): | |
| # Chunk the audio into segments | |
| chunk_files, total_chunks = chunk_audio(audio_input, chunk_duration) | |
| results = [] | |
| # Process each chunk | |
| for i, chunk_path in enumerate(chunk_files): | |
| print(f"Processing chunk {i+1}/{total_chunks} ({chunk_duration}s each)") | |
| result = process_chunk(chunk_path, i, total_chunks, generate_audio) | |
| results.append(result) | |
| # Clean up temporary chunk files | |
| for chunk_path in chunk_files: | |
| try: | |
| if chunk_path != audio_input: # Don't delete original input file | |
| os.unlink(chunk_path) | |
| except: | |
| pass | |
| return results | |
| # Function to clear audio input and all outputs | |
| def clear_all(): | |
| # Create a list with None for all outputs | |
| outputs = [None] # For audio input | |
| # For group components (set to invisible) | |
| outputs.extend([gr.Group(visible=False)] * len(group_components)) | |
| # For all output containers (set to None) | |
| outputs.extend([None] * (len(output_containers) * 5)) | |
| # For loading indicator (empty HTML) | |
| outputs.append(gr.HTML("")) | |
| # For chunk duration (reset to 10) | |
| outputs.append(10) | |
| # For example selector (reset to None) | |
| outputs.append(None) | |
| return outputs | |
| # Function to load example audio | |
| def load_example_audio(example_name): | |
| # This function would load the example audio based on the selected example | |
| # For now, we'll return a placeholder path - you should replace these with actual paths to your example audio files | |
| example_paths = { | |
| "Happy Speech": "examples/happy_speech.wav", | |
| "Sad Story": "examples/sad_story.wav", | |
| "Neutral News": "examples/neutral_news.wav" | |
| } | |
| # Return the path to the selected example | |
| return example_paths.get(example_name, "examples/happy_speech.wav") | |
| # Create the Gradio interface with proper output handling | |
| with gr.Blocks(title="Affective Virtual Environments - Chunked Processing") as interface: | |
| gr.Markdown("# Affective Virtual Environments") | |
| gr.Markdown("Create an AVE using your voice. Audio is split into chunks, with separate predictions and generations for each segment.") | |
| with gr.Row(): | |
| with gr.Column(scale=2): | |
| audio_input = gr.Audio(label="Input Audio", type="filepath", sources=["microphone", "upload"]) | |
| # Add example audio selection | |
| example_selector = gr.Dropdown( | |
| label="Select Example Audio", | |
| choices=["Happy Speech", "Sad Story", "Neutral News"], | |
| value=None, | |
| info="Choose from pre-recorded example speeches" | |
| ) | |
| # Add button to load selected example | |
| load_example_btn = gr.Button("Load Example", variant="secondary") | |
| with gr.Column(scale=1): | |
| # Add chunk duration input | |
| chunk_duration_input = gr.Number( | |
| label="Chunk Duration (seconds)", | |
| value=10, | |
| minimum=1, | |
| maximum=60, | |
| step=1, | |
| info="Duration of each audio segment to process (1-60 seconds)" | |
| ) | |
| # Add checkbox for audio generation | |
| generate_audio_checkbox = gr.Checkbox( | |
| label="Generate Audio (may take longer)", | |
| value=True, | |
| info="Uncheck to skip music generation and speed up processing" | |
| ) | |
| with gr.Row(): | |
| process_btn = gr.Button("Process Audio", variant="primary") | |
| clear_btn = gr.Button("Clear All", variant="secondary") | |
| # Add a loading indicator - Fixed CSS syntax error by escaping % signs | |
| loading_indicator = gr.HTML(""" | |
| <div id="loading" style="display: none; text-align: center; margin: 20px;"> | |
| <p style="font-size: 18px; color: #4a4a4a;">Processing audio chunks...</p> | |
| <div style="border: 4px solid #f3f3f3; border-top: 4px solid #3498db; border-radius: 50%; width: 30px; height: 30px; animation: spin 2s linear infinite; margin: 0 auto;"></div> | |
| <style>@keyframes spin { 0%% { transform: rotate(0deg); } 100%% { transform: rotate(360deg); } }</style> | |
| </div> | |
| """) | |
| # Create output components for each chunk type | |
| output_containers = [] | |
| group_components = [] # Store group components separately | |
| # We'll create up to 20 chunk slots to accommodate different chunk durations | |
| for i in range(20): | |
| with gr.Group(visible=False) as chunk_group: | |
| gr.Markdown(f"### Chunk {i+1} Results") | |
| with gr.Row(): | |
| emotion_output = gr.Label(label="Acoustic Emotion Prediction") | |
| transcription_output = gr.Label(label="Transcribed Text") | |
| sentiment_output = gr.Label(label="Sentiment Analysis") | |
| with gr.Row(): | |
| image_output = gr.Image(label="Generated Equirectangular Image") | |
| audio_output = gr.Audio(label="Generated Music") | |
| gr.HTML("<hr style='margin: 20px 0; border: 1px solid #ccc;'>") | |
| group_components.append(chunk_group) # Store the group component | |
| output_containers.append({ | |
| 'emotion': emotion_output, | |
| 'transcription': transcription_output, | |
| 'sentiment': sentiment_output, | |
| 'image': image_output, | |
| 'music': audio_output | |
| }) | |
| def process_and_display(audio_input, generate_audio, chunk_duration): | |
| # Validate chunk duration | |
| if chunk_duration is None or chunk_duration <= 0: | |
| chunk_duration = 10 | |
| # Show loading indicator - Fixed CSS syntax error by escaping % signs | |
| yield [gr.HTML(f""" | |
| <div style="text-align: center; margin: 20px;"> | |
| <p style="font-size: 18px; color: #4a4a4a;">Processing audio in {chunk_duration}-second chunks...</p> | |
| <div style="border: 4px solid #f3f3f3; border-top: 4px solid #3498db; border-radius: 50%; width: 30px; height: 30px; animation: spin 2s linear infinite; margin: 0 auto;"></div> | |
| <style>@keyframes spin {{ 0%% {{ transform: rotate(0deg); }} 100%% {{ transform: rotate(360deg); }} }}</style> | |
| </div> | |
| """)] + [gr.Group(visible=False)] * len(group_components) + [None] * (len(output_containers) * 5) | |
| results = get_predictions(audio_input, generate_audio, chunk_duration) | |
| # Initialize outputs list | |
| outputs = [] | |
| group_visibility = [] | |
| # Process each result | |
| for i, result in enumerate(results): | |
| if i < len(output_containers): | |
| group_visibility.append(gr.Group(visible=True)) | |
| outputs.extend([ | |
| result['emotion'], | |
| result['transcription'], | |
| result['sentiment'], | |
| result['image'], | |
| result['music'] | |
| ]) | |
| else: | |
| # If we have more results than containers, just extend with None | |
| group_visibility.append(gr.Group(visible=False)) | |
| outputs.extend([None] * 5) | |
| # Hide remaining containers | |
| for i in range(len(results), len(output_containers)): | |
| group_visibility.append(gr.Group(visible=False)) | |
| outputs.extend([None] * 5) | |
| # Hide loading indicator and show results | |
| yield [gr.HTML("")] + group_visibility + outputs | |
| # Function to handle example selection | |
| def load_example(example_name): | |
| if not example_name: | |
| return None, None # Return None for both audio and example selector | |
| # Get the path to the example audio file | |
| example_path = load_example_audio(example_name) | |
| # Return the example path to update the audio component | |
| # The waveform will be automatically displayed by Gradio's Audio component | |
| return example_path, example_name | |
| # Set up the button click | |
| process_btn.click( | |
| fn=process_and_display, | |
| inputs=[audio_input, generate_audio_checkbox, chunk_duration_input], | |
| outputs=[loading_indicator] + group_components + [comp for container in output_containers for comp in [ | |
| container['emotion'], | |
| container['transcription'], | |
| container['sentiment'], | |
| container['image'], | |
| container['music'] | |
| ]] | |
| ) | |
| # Set up the clear button - Fixed output list | |
| clear_btn.click( | |
| fn=clear_all, | |
| inputs=[], | |
| outputs=[audio_input] + group_components + [comp for container in output_containers for comp in [ | |
| container['emotion'], | |
| container['transcription'], | |
| container['sentiment'], | |
| container['image'], | |
| container['music'] | |
| ]] + [loading_indicator] + [chunk_duration_input] + [example_selector] | |
| ) | |
| # Set up the example loading button | |
| load_example_btn.click( | |
| fn=load_example, | |
| inputs=[example_selector], | |
| outputs=[audio_input, example_selector] | |
| ) | |
| interface.launch() |