import gradio as gr import pyvista as pv from pyvista import examples import numpy as np import librosa import requests from io import BytesIO from PIL import Image import os from tensorflow.keras.models import load_model from faster_whisper import WhisperModel import random from textblob import TextBlob import torch import scipy.io.wavfile from transformers import AutoProcessor, MusicgenForConditionalGeneration import tempfile import base64 import plotly.graph_objects as go from plotly.subplots import make_subplots import soundfile as sf from pydub import AudioSegment import math import json # Load the emotion prediction model def load_emotion_model(model_path): try: model = load_model(model_path) print("Emotion model loaded successfully") return model except Exception as e: print("Error loading emotion prediction model:", e) return None model_path = 'mymodel_SER_LSTM_RAVDESS.h5' model = load_emotion_model(model_path) # Initialize WhisperModel model_size = "small" model2 = WhisperModel(model_size, device="cpu", compute_type="int8") # Load MusicGen model def load_musicgen_model(): try: device = "cuda" if torch.cuda.is_available() else "cpu" processor = AutoProcessor.from_pretrained("facebook/musicgen-small") music_model = MusicgenForConditionalGeneration.from_pretrained("facebook/musicgen-small") music_model.to(device) print("MusicGen model loaded successfully") return processor, music_model, device except Exception as e: print("Error loading MusicGen model:", e) return None, None, None processor, music_model, device = load_musicgen_model() # Function to chunk audio into segments def chunk_audio(audio_path, chunk_duration=10): """Split audio into chunks and return list of chunk file paths""" try: # Load audio file audio = AudioSegment.from_file(audio_path) duration_ms = len(audio) chunk_ms = chunk_duration * 1000 # Validate chunk duration if chunk_duration <= 0: raise ValueError("Chunk duration must be positive") if chunk_duration > duration_ms / 1000: # If chunk duration is longer than audio, return the whole audio return [audio_path], 1 chunks = [] chunk_files = [] # Calculate number of chunks num_chunks = math.ceil(duration_ms / chunk_ms) for i in range(num_chunks): start_ms = i * chunk_ms end_ms = min((i + 1) * chunk_ms, duration_ms) # Extract chunk chunk = audio[start_ms:end_ms] chunks.append(chunk) # Save chunk to temporary file with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as tmp_file: chunk.export(tmp_file.name, format="wav") chunk_files.append(tmp_file.name) return chunk_files, num_chunks except Exception as e: print("Error chunking audio:", e) # Return original file as single chunk if chunking fails return [audio_path], 1 # Function to transcribe audio def transcribe(wav_filepath): try: segments, _ = model2.transcribe(wav_filepath, beam_size=5) return "".join([segment.text for segment in segments]) except Exception as e: print("Error transcribing audio:", e) return "Transcription failed" # Function to extract MFCC features from audio def extract_mfcc(wav_file_name): try: y, sr = librosa.load(wav_file_name) mfccs = np.mean(librosa.feature.mfcc(y=y, sr=sr, n_mfcc=40).T, axis=0) return mfccs except Exception as e: print("Error extracting MFCC features:", e) return None # Emotions dictionary emotions = {0: 'neutral', 1: 'calm', 2: 'happy', 3: 'sad', 4: 'angry', 5: 'fearful', 6: 'disgust', 7: 'surprised'} # Function to predict emotion from audio def predict_emotion_from_audio(wav_filepath): try: if model is None: return "Model not loaded" test_point = extract_mfcc(wav_filepath) if test_point is not None: test_point = np.reshape(test_point, newshape=(1, 40, 1)) predictions = model.predict(test_point) predicted_emotion_label = np.argmax(predictions[0]) return emotions.get(predicted_emotion_label, "Unknown emotion") else: return "Error: Unable to extract features" except Exception as e: print("Error predicting emotion:", e) return "Prediction error" # Function to analyze sentiment from text def analyze_sentiment(text): try: if not text or text.strip() == "": return "neutral", 0.0 analysis = TextBlob(text) polarity = analysis.sentiment.polarity if polarity > 0.1: sentiment = "positive" elif polarity < -0.1: sentiment = "negative" else: sentiment = "neutral" return sentiment, polarity except Exception as e: print("Error analyzing sentiment:", e) return "neutral", 0.0 # Function to get image prompt based on sentiment def get_image_prompt(sentiment, transcribed_text, chunk_idx, total_chunks): base_prompt = f"Chunk {chunk_idx+1}/{total_chunks}: " if sentiment == "positive": return base_prompt + f"Generate a vibrant, uplifting equirectangular 360 image texture with bright colors, joyful atmosphere, and optimistic vibes representing: [{transcribed_text}]. The scene should evoke happiness and positivity." elif sentiment == "negative": return base_prompt + f"Generate a moody, dramatic equirectangular 360 image texture with dark tones, intense atmosphere, and emotional depth representing: [{transcribed_text}]. The scene should convey melancholy and intensity." else: # neutral return base_prompt + f"Generate a balanced, serene equirectangular 360 image texture with harmonious colors, peaceful atmosphere, and calm vibes representing: [{transcribed_text}]. The scene should evoke tranquility and balance." # Function to get music prompt based on emotion def get_music_prompt(emotion, transcribed_text, chunk_idx, total_chunks): base_prompt = f"Chunk {chunk_idx+1}/{total_chunks}: " emotion_prompts = { 'neutral': f"Create ambient, background music with neutral tones, subtle melodies, and unobtrusive atmosphere that complements: {transcribed_text}. The music should be calm and balanced.", 'calm': f"Generate soothing, peaceful music with gentle melodies, soft instrumentation, and relaxing vibes that represents: {transcribed_text}. The music should evoke tranquility and serenity.", 'happy': f"Create joyful, upbeat music with cheerful melodies, bright instrumentation, and energetic rhythms that celebrates: {transcribed_text}. The music should evoke happiness and positivity.", 'sad': f"Generate emotional, melancholic music with poignant melodies, soft strings, and heartfelt atmosphere that reflects: {transcribed_text}. The music should evoke sadness and reflection.", 'angry': f"Create intense, powerful music with driving rhythms, aggressive instrumentation, and strong dynamics that expresses: {transcribed_text}. The music should evoke anger and intensity.", 'fearful': f"Generate suspenseful, tense music with eerie melodies, atmospheric sounds, and unsettling vibes that represents: {transcribed_text}. The music should evoke fear and anticipation.", 'disgust': f"Create dark, unsettling music with dissonant harmonies, unusual sounds, and uncomfortable atmosphere that reflects: {transcribed_text}. The music should evoke discomfort and unease.", 'surprised': f"Generate dynamic, unexpected music with sudden changes, playful melodies, and surprising elements that represents: {transcribed_text}. The music should evoke surprise and wonder." } return base_prompt + emotion_prompts.get(emotion.lower(), f"Create background music with {emotion} atmosphere that represents: {transcribed_text}") # Function to generate music with MusicGen (using acoustic emotion prediction) def generate_music(transcribed_text, emotion_prediction, chunk_idx, total_chunks): try: if processor is None or music_model is None: return None # Get specific prompt based on emotion prompt = get_music_prompt(emotion_prediction, transcribed_text, chunk_idx, total_chunks) # Limit prompt length to avoid model issues if len(prompt) > 200: prompt = prompt[:200] + "..." inputs = processor( text=[prompt], padding=True, return_tensors="pt", ).to(device) # Generate audio audio_values = music_model.generate(**inputs, max_new_tokens=512) # Convert to numpy array and sample rate sampling_rate = music_model.config.audio_encoder.sampling_rate audio_data = audio_values[0, 0].cpu().numpy() # Normalize audio data audio_data = audio_data / np.max(np.abs(audio_data)) # Create a temporary file to save the audio with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as tmp_file: scipy.io.wavfile.write(tmp_file.name, rate=sampling_rate, data=audio_data) return tmp_file.name except Exception as e: print("Error generating music:", e) return None # --- DeepAI Image Generation (Text2Img) --- api_key = os.getenv("DeepAI_api_key") def generate_image(sentiment_prediction, transcribed_text, chunk_idx, total_chunks): try: if not api_key: # fallback white image if no API key return Image.new('RGB', (1024, 512), color='white') # Get specific prompt based on sentiment prompt = get_image_prompt(sentiment_prediction, transcribed_text, chunk_idx, total_chunks) # Make request to DeepAI text2img API response = requests.post( "https://api.deepai.org/api/text2img", data={ 'text': prompt, 'width': 1024, 'height': 512, 'image_generator_version': 'hd' }, headers={'api-key': api_key} ) data = response.json() if 'output_url' in data: # Download the generated image img_resp = requests.get(data['output_url']) return Image.open(BytesIO(img_resp.content)) else: print("Error in DeepAI response:", data) # Return a fallback image return Image.new('RGB', (1024, 512), color='white') except Exception as e: print("Error generating image:", e) # Return a fallback image return Image.new('RGB', (1024, 512), color='white') # Function to process a single chunk def process_chunk(chunk_path, chunk_idx, total_chunks, generate_audio=True): try: # Get acoustic emotion prediction (for music) emotion_prediction = predict_emotion_from_audio(chunk_path) # Get transcribed text transcribed_text = transcribe(chunk_path) # Analyze sentiment of transcribed text (for image) sentiment, polarity = analyze_sentiment(transcribed_text) # Generate image using SENTIMENT analysis with specific prompt image = generate_image(sentiment, transcribed_text, chunk_idx, total_chunks) # Generate music only if audio generation is enabled music_path = None if generate_audio: music_path = generate_music(transcribed_text, emotion_prediction, chunk_idx, total_chunks) return { 'chunk_index': chunk_idx + 1, 'emotion': emotion_prediction, 'transcription': transcribed_text, 'sentiment': sentiment, 'image': image, 'music': music_path } except Exception as e: print(f"Error processing chunk {chunk_idx + 1}:", e) # Return a fallback result with all required keys return { 'chunk_index': chunk_idx + 1, 'emotion': "Error", 'transcription': "Transcription failed", 'sentiment': "Sentiment: error", 'image': Image.new('RGB', (1024, 512), color='white'), 'music': None } # Function to get predictions for all chunks def get_predictions(audio_input, generate_audio=True, chunk_duration=10): # Chunk the audio into segments chunk_files, total_chunks = chunk_audio(audio_input, chunk_duration) results = [] # Process each chunk for i, chunk_path in enumerate(chunk_files): print(f"Processing chunk {i+1}/{total_chunks} ({chunk_duration}s each)") result = process_chunk(chunk_path, i, total_chunks, generate_audio) results.append(result) # Clean up temporary chunk files for chunk_path in chunk_files: try: if chunk_path != audio_input: # Don't delete original input file os.unlink(chunk_path) except: pass return results # Function to clear audio input and all outputs def clear_all(): # Create a list with None for all outputs outputs = [None] # For audio input # For group components (set to invisible) outputs.extend([gr.Group(visible=False)] * len(group_components)) # For all output containers (set to None) outputs.extend([None] * (len(output_containers) * 5)) # For loading indicator (empty HTML) outputs.append(gr.HTML("")) # For chunk duration (reset to 10) outputs.append(10) # For example selector (reset to None) outputs.append(None) return outputs # Function to load example audio def load_example_audio(example_name): # This function would load the example audio based on the selected example # For now, we'll return a placeholder path - you should replace these with actual paths to your example audio files example_paths = { "Happy Speech": "examples/happy_speech.wav", "Sad Story": "examples/sad_story.wav", "Neutral News": "examples/neutral_news.wav" } # Return the path to the selected example return example_paths.get(example_name, "examples/happy_speech.wav") # Create the Gradio interface with proper output handling with gr.Blocks(title="Affective Virtual Environments - Chunked Processing") as interface: gr.Markdown("# Affective Virtual Environments") gr.Markdown("Create an AVE using your voice. Audio is split into chunks, with separate predictions and generations for each segment.") with gr.Row(): with gr.Column(scale=2): audio_input = gr.Audio(label="Input Audio", type="filepath", sources=["microphone", "upload"]) # Add example audio selection example_selector = gr.Dropdown( label="Select Example Audio", choices=["Happy Speech", "Sad Story", "Neutral News"], value=None, info="Choose from pre-recorded example speeches" ) # Add button to load selected example load_example_btn = gr.Button("Load Example", variant="secondary") with gr.Column(scale=1): # Add chunk duration input chunk_duration_input = gr.Number( label="Chunk Duration (seconds)", value=10, minimum=1, maximum=60, step=1, info="Duration of each audio segment to process (1-60 seconds)" ) # Add checkbox for audio generation generate_audio_checkbox = gr.Checkbox( label="Generate Audio (may take longer)", value=True, info="Uncheck to skip music generation and speed up processing" ) with gr.Row(): process_btn = gr.Button("Process Audio", variant="primary") clear_btn = gr.Button("Clear All", variant="secondary") # Add a loading indicator - Fixed CSS syntax error by escaping % signs loading_indicator = gr.HTML(""" """) # Create output components for each chunk type output_containers = [] group_components = [] # Store group components separately # We'll create up to 20 chunk slots to accommodate different chunk durations for i in range(20): with gr.Group(visible=False) as chunk_group: gr.Markdown(f"### Chunk {i+1} Results") with gr.Row(): emotion_output = gr.Label(label="Acoustic Emotion Prediction") transcription_output = gr.Label(label="Transcribed Text") sentiment_output = gr.Label(label="Sentiment Analysis") with gr.Row(): image_output = gr.Image(label="Generated Equirectangular Image") audio_output = gr.Audio(label="Generated Music") gr.HTML("
") group_components.append(chunk_group) # Store the group component output_containers.append({ 'emotion': emotion_output, 'transcription': transcription_output, 'sentiment': sentiment_output, 'image': image_output, 'music': audio_output }) def process_and_display(audio_input, generate_audio, chunk_duration): # Validate chunk duration if chunk_duration is None or chunk_duration <= 0: chunk_duration = 10 # Show loading indicator - Fixed CSS syntax error by escaping % signs yield [gr.HTML(f"""

Processing audio in {chunk_duration}-second chunks...

""")] + [gr.Group(visible=False)] * len(group_components) + [None] * (len(output_containers) * 5) results = get_predictions(audio_input, generate_audio, chunk_duration) # Initialize outputs list outputs = [] group_visibility = [] # Process each result for i, result in enumerate(results): if i < len(output_containers): group_visibility.append(gr.Group(visible=True)) outputs.extend([ result['emotion'], result['transcription'], result['sentiment'], result['image'], result['music'] ]) else: # If we have more results than containers, just extend with None group_visibility.append(gr.Group(visible=False)) outputs.extend([None] * 5) # Hide remaining containers for i in range(len(results), len(output_containers)): group_visibility.append(gr.Group(visible=False)) outputs.extend([None] * 5) # Hide loading indicator and show results yield [gr.HTML("")] + group_visibility + outputs # Function to handle example selection def load_example(example_name): if not example_name: return None, None # Return None for both audio and example selector # Get the path to the example audio file example_path = load_example_audio(example_name) # Return the example path to update the audio component # The waveform will be automatically displayed by Gradio's Audio component return example_path, example_name # Set up the button click process_btn.click( fn=process_and_display, inputs=[audio_input, generate_audio_checkbox, chunk_duration_input], outputs=[loading_indicator] + group_components + [comp for container in output_containers for comp in [ container['emotion'], container['transcription'], container['sentiment'], container['image'], container['music'] ]] ) # Set up the clear button - Fixed output list clear_btn.click( fn=clear_all, inputs=[], outputs=[audio_input] + group_components + [comp for container in output_containers for comp in [ container['emotion'], container['transcription'], container['sentiment'], container['image'], container['music'] ]] + [loading_indicator] + [chunk_duration_input] + [example_selector] ) # Set up the example loading button load_example_btn.click( fn=load_example, inputs=[example_selector], outputs=[audio_input, example_selector] ) interface.launch()