Speech2Scene / app.py
jfforero's picture
Update app.py
fe6ce3a verified
import gradio as gr
import pyvista as pv
from pyvista import examples
import numpy as np
import librosa
import requests
from io import BytesIO
from PIL import Image
import os
from tensorflow.keras.models import load_model
from faster_whisper import WhisperModel
import random
from textblob import TextBlob
import torch
import scipy.io.wavfile
from transformers import AutoProcessor, MusicgenForConditionalGeneration
import tempfile
import base64
import plotly.graph_objects as go
from plotly.subplots import make_subplots
import soundfile as sf
from pydub import AudioSegment
import math
import json
# Load the emotion prediction model
def load_emotion_model(model_path):
try:
model = load_model(model_path)
print("Emotion model loaded successfully")
return model
except Exception as e:
print("Error loading emotion prediction model:", e)
return None
model_path = 'mymodel_SER_LSTM_RAVDESS.h5'
model = load_emotion_model(model_path)
# Initialize WhisperModel
model_size = "small"
model2 = WhisperModel(model_size, device="cpu", compute_type="int8")
# Load MusicGen model
def load_musicgen_model():
try:
device = "cuda" if torch.cuda.is_available() else "cpu"
processor = AutoProcessor.from_pretrained("facebook/musicgen-small")
music_model = MusicgenForConditionalGeneration.from_pretrained("facebook/musicgen-small")
music_model.to(device)
print("MusicGen model loaded successfully")
return processor, music_model, device
except Exception as e:
print("Error loading MusicGen model:", e)
return None, None, None
processor, music_model, device = load_musicgen_model()
# Function to chunk audio into segments
def chunk_audio(audio_path, chunk_duration=10):
"""Split audio into chunks and return list of chunk file paths"""
try:
# Load audio file
audio = AudioSegment.from_file(audio_path)
duration_ms = len(audio)
chunk_ms = chunk_duration * 1000
# Validate chunk duration
if chunk_duration <= 0:
raise ValueError("Chunk duration must be positive")
if chunk_duration > duration_ms / 1000:
# If chunk duration is longer than audio, return the whole audio
return [audio_path], 1
chunks = []
chunk_files = []
# Calculate number of chunks
num_chunks = math.ceil(duration_ms / chunk_ms)
for i in range(num_chunks):
start_ms = i * chunk_ms
end_ms = min((i + 1) * chunk_ms, duration_ms)
# Extract chunk
chunk = audio[start_ms:end_ms]
chunks.append(chunk)
# Save chunk to temporary file
with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as tmp_file:
chunk.export(tmp_file.name, format="wav")
chunk_files.append(tmp_file.name)
return chunk_files, num_chunks
except Exception as e:
print("Error chunking audio:", e)
# Return original file as single chunk if chunking fails
return [audio_path], 1
# Function to transcribe audio
def transcribe(wav_filepath):
try:
segments, _ = model2.transcribe(wav_filepath, beam_size=5)
return "".join([segment.text for segment in segments])
except Exception as e:
print("Error transcribing audio:", e)
return "Transcription failed"
# Function to extract MFCC features from audio
def extract_mfcc(wav_file_name):
try:
y, sr = librosa.load(wav_file_name)
mfccs = np.mean(librosa.feature.mfcc(y=y, sr=sr, n_mfcc=40).T, axis=0)
return mfccs
except Exception as e:
print("Error extracting MFCC features:", e)
return None
# Emotions dictionary
emotions = {0: 'neutral', 1: 'calm', 2: 'happy', 3: 'sad', 4: 'angry', 5: 'fearful', 6: 'disgust', 7: 'surprised'}
# Function to predict emotion from audio
def predict_emotion_from_audio(wav_filepath):
try:
if model is None:
return "Model not loaded"
test_point = extract_mfcc(wav_filepath)
if test_point is not None:
test_point = np.reshape(test_point, newshape=(1, 40, 1))
predictions = model.predict(test_point)
predicted_emotion_label = np.argmax(predictions[0])
return emotions.get(predicted_emotion_label, "Unknown emotion")
else:
return "Error: Unable to extract features"
except Exception as e:
print("Error predicting emotion:", e)
return "Prediction error"
# Function to analyze sentiment from text
def analyze_sentiment(text):
try:
if not text or text.strip() == "":
return "neutral", 0.0
analysis = TextBlob(text)
polarity = analysis.sentiment.polarity
if polarity > 0.1:
sentiment = "positive"
elif polarity < -0.1:
sentiment = "negative"
else:
sentiment = "neutral"
return sentiment, polarity
except Exception as e:
print("Error analyzing sentiment:", e)
return "neutral", 0.0
# Function to get image prompt based on sentiment
def get_image_prompt(sentiment, transcribed_text, chunk_idx, total_chunks):
base_prompt = f"Chunk {chunk_idx+1}/{total_chunks}: "
if sentiment == "positive":
return base_prompt + f"Generate a vibrant, uplifting equirectangular 360 image texture with bright colors, joyful atmosphere, and optimistic vibes representing: [{transcribed_text}]. The scene should evoke happiness and positivity."
elif sentiment == "negative":
return base_prompt + f"Generate a moody, dramatic equirectangular 360 image texture with dark tones, intense atmosphere, and emotional depth representing: [{transcribed_text}]. The scene should convey melancholy and intensity."
else: # neutral
return base_prompt + f"Generate a balanced, serene equirectangular 360 image texture with harmonious colors, peaceful atmosphere, and calm vibes representing: [{transcribed_text}]. The scene should evoke tranquility and balance."
# Function to get music prompt based on emotion
def get_music_prompt(emotion, transcribed_text, chunk_idx, total_chunks):
base_prompt = f"Chunk {chunk_idx+1}/{total_chunks}: "
emotion_prompts = {
'neutral': f"Create ambient, background music with neutral tones, subtle melodies, and unobtrusive atmosphere that complements: {transcribed_text}. The music should be calm and balanced.",
'calm': f"Generate soothing, peaceful music with gentle melodies, soft instrumentation, and relaxing vibes that represents: {transcribed_text}. The music should evoke tranquility and serenity.",
'happy': f"Create joyful, upbeat music with cheerful melodies, bright instrumentation, and energetic rhythms that celebrates: {transcribed_text}. The music should evoke happiness and positivity.",
'sad': f"Generate emotional, melancholic music with poignant melodies, soft strings, and heartfelt atmosphere that reflects: {transcribed_text}. The music should evoke sadness and reflection.",
'angry': f"Create intense, powerful music with driving rhythms, aggressive instrumentation, and strong dynamics that expresses: {transcribed_text}. The music should evoke anger and intensity.",
'fearful': f"Generate suspenseful, tense music with eerie melodies, atmospheric sounds, and unsettling vibes that represents: {transcribed_text}. The music should evoke fear and anticipation.",
'disgust': f"Create dark, unsettling music with dissonant harmonies, unusual sounds, and uncomfortable atmosphere that reflects: {transcribed_text}. The music should evoke discomfort and unease.",
'surprised': f"Generate dynamic, unexpected music with sudden changes, playful melodies, and surprising elements that represents: {transcribed_text}. The music should evoke surprise and wonder."
}
return base_prompt + emotion_prompts.get(emotion.lower(),
f"Create background music with {emotion} atmosphere that represents: {transcribed_text}")
# Function to generate music with MusicGen (using acoustic emotion prediction)
def generate_music(transcribed_text, emotion_prediction, chunk_idx, total_chunks):
try:
if processor is None or music_model is None:
return None
# Get specific prompt based on emotion
prompt = get_music_prompt(emotion_prediction, transcribed_text, chunk_idx, total_chunks)
# Limit prompt length to avoid model issues
if len(prompt) > 200:
prompt = prompt[:200] + "..."
inputs = processor(
text=[prompt],
padding=True,
return_tensors="pt",
).to(device)
# Generate audio
audio_values = music_model.generate(**inputs, max_new_tokens=512)
# Convert to numpy array and sample rate
sampling_rate = music_model.config.audio_encoder.sampling_rate
audio_data = audio_values[0, 0].cpu().numpy()
# Normalize audio data
audio_data = audio_data / np.max(np.abs(audio_data))
# Create a temporary file to save the audio
with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as tmp_file:
scipy.io.wavfile.write(tmp_file.name, rate=sampling_rate, data=audio_data)
return tmp_file.name
except Exception as e:
print("Error generating music:", e)
return None
# --- DeepAI Image Generation (Text2Img) ---
api_key = os.getenv("DeepAI_api_key")
def generate_image(sentiment_prediction, transcribed_text, chunk_idx, total_chunks):
try:
if not api_key:
# fallback white image if no API key
return Image.new('RGB', (1024, 512), color='white')
# Get specific prompt based on sentiment
prompt = get_image_prompt(sentiment_prediction, transcribed_text, chunk_idx, total_chunks)
# Make request to DeepAI text2img API
response = requests.post(
"https://api.deepai.org/api/text2img",
data={
'text': prompt,
'width': 1024,
'height': 512,
'image_generator_version': 'hd'
},
headers={'api-key': api_key}
)
data = response.json()
if 'output_url' in data:
# Download the generated image
img_resp = requests.get(data['output_url'])
return Image.open(BytesIO(img_resp.content))
else:
print("Error in DeepAI response:", data)
# Return a fallback image
return Image.new('RGB', (1024, 512), color='white')
except Exception as e:
print("Error generating image:", e)
# Return a fallback image
return Image.new('RGB', (1024, 512), color='white')
# Function to process a single chunk
def process_chunk(chunk_path, chunk_idx, total_chunks, generate_audio=True):
try:
# Get acoustic emotion prediction (for music)
emotion_prediction = predict_emotion_from_audio(chunk_path)
# Get transcribed text
transcribed_text = transcribe(chunk_path)
# Analyze sentiment of transcribed text (for image)
sentiment, polarity = analyze_sentiment(transcribed_text)
# Generate image using SENTIMENT analysis with specific prompt
image = generate_image(sentiment, transcribed_text, chunk_idx, total_chunks)
# Generate music only if audio generation is enabled
music_path = None
if generate_audio:
music_path = generate_music(transcribed_text, emotion_prediction, chunk_idx, total_chunks)
return {
'chunk_index': chunk_idx + 1,
'emotion': emotion_prediction,
'transcription': transcribed_text,
'sentiment': sentiment,
'image': image,
'music': music_path
}
except Exception as e:
print(f"Error processing chunk {chunk_idx + 1}:", e)
# Return a fallback result with all required keys
return {
'chunk_index': chunk_idx + 1,
'emotion': "Error",
'transcription': "Transcription failed",
'sentiment': "Sentiment: error",
'image': Image.new('RGB', (1024, 512), color='white'),
'music': None
}
# Function to get predictions for all chunks
def get_predictions(audio_input, generate_audio=True, chunk_duration=10):
# Chunk the audio into segments
chunk_files, total_chunks = chunk_audio(audio_input, chunk_duration)
results = []
# Process each chunk
for i, chunk_path in enumerate(chunk_files):
print(f"Processing chunk {i+1}/{total_chunks} ({chunk_duration}s each)")
result = process_chunk(chunk_path, i, total_chunks, generate_audio)
results.append(result)
# Clean up temporary chunk files
for chunk_path in chunk_files:
try:
if chunk_path != audio_input: # Don't delete original input file
os.unlink(chunk_path)
except:
pass
return results
# Function to clear audio input and all outputs
def clear_all():
# Create a list with None for all outputs
outputs = [None] # For audio input
# For group components (set to invisible)
outputs.extend([gr.Group(visible=False)] * len(group_components))
# For all output containers (set to None)
outputs.extend([None] * (len(output_containers) * 5))
# For loading indicator (empty HTML)
outputs.append(gr.HTML(""))
# For chunk duration (reset to 10)
outputs.append(10)
# For example selector (reset to None)
outputs.append(None)
return outputs
# Function to load example audio
def load_example_audio(example_name):
# This function would load the example audio based on the selected example
# For now, we'll return a placeholder path - you should replace these with actual paths to your example audio files
example_paths = {
"Happy Speech": "examples/happy_speech.wav",
"Sad Story": "examples/sad_story.wav",
"Neutral News": "examples/neutral_news.wav"
}
# Return the path to the selected example
return example_paths.get(example_name, "examples/happy_speech.wav")
# Create the Gradio interface with proper output handling
with gr.Blocks(title="Affective Virtual Environments - Chunked Processing") as interface:
gr.Markdown("# Affective Virtual Environments")
gr.Markdown("Create an AVE using your voice. Audio is split into chunks, with separate predictions and generations for each segment.")
with gr.Row():
with gr.Column(scale=2):
audio_input = gr.Audio(label="Input Audio", type="filepath", sources=["microphone", "upload"])
# Add example audio selection
example_selector = gr.Dropdown(
label="Select Example Audio",
choices=["Happy Speech", "Sad Story", "Neutral News"],
value=None,
info="Choose from pre-recorded example speeches"
)
# Add button to load selected example
load_example_btn = gr.Button("Load Example", variant="secondary")
with gr.Column(scale=1):
# Add chunk duration input
chunk_duration_input = gr.Number(
label="Chunk Duration (seconds)",
value=10,
minimum=1,
maximum=60,
step=1,
info="Duration of each audio segment to process (1-60 seconds)"
)
# Add checkbox for audio generation
generate_audio_checkbox = gr.Checkbox(
label="Generate Audio (may take longer)",
value=True,
info="Uncheck to skip music generation and speed up processing"
)
with gr.Row():
process_btn = gr.Button("Process Audio", variant="primary")
clear_btn = gr.Button("Clear All", variant="secondary")
# Add a loading indicator - Fixed CSS syntax error by escaping % signs
loading_indicator = gr.HTML("""
<div id="loading" style="display: none; text-align: center; margin: 20px;">
<p style="font-size: 18px; color: #4a4a4a;">Processing audio chunks...</p>
<div style="border: 4px solid #f3f3f3; border-top: 4px solid #3498db; border-radius: 50%; width: 30px; height: 30px; animation: spin 2s linear infinite; margin: 0 auto;"></div>
<style>@keyframes spin { 0%% { transform: rotate(0deg); } 100%% { transform: rotate(360deg); } }</style>
</div>
""")
# Create output components for each chunk type
output_containers = []
group_components = [] # Store group components separately
# We'll create up to 20 chunk slots to accommodate different chunk durations
for i in range(20):
with gr.Group(visible=False) as chunk_group:
gr.Markdown(f"### Chunk {i+1} Results")
with gr.Row():
emotion_output = gr.Label(label="Acoustic Emotion Prediction")
transcription_output = gr.Label(label="Transcribed Text")
sentiment_output = gr.Label(label="Sentiment Analysis")
with gr.Row():
image_output = gr.Image(label="Generated Equirectangular Image")
audio_output = gr.Audio(label="Generated Music")
gr.HTML("<hr style='margin: 20px 0; border: 1px solid #ccc;'>")
group_components.append(chunk_group) # Store the group component
output_containers.append({
'emotion': emotion_output,
'transcription': transcription_output,
'sentiment': sentiment_output,
'image': image_output,
'music': audio_output
})
def process_and_display(audio_input, generate_audio, chunk_duration):
# Validate chunk duration
if chunk_duration is None or chunk_duration <= 0:
chunk_duration = 10
# Show loading indicator - Fixed CSS syntax error by escaping % signs
yield [gr.HTML(f"""
<div style="text-align: center; margin: 20px;">
<p style="font-size: 18px; color: #4a4a4a;">Processing audio in {chunk_duration}-second chunks...</p>
<div style="border: 4px solid #f3f3f3; border-top: 4px solid #3498db; border-radius: 50%; width: 30px; height: 30px; animation: spin 2s linear infinite; margin: 0 auto;"></div>
<style>@keyframes spin {{ 0%% {{ transform: rotate(0deg); }} 100%% {{ transform: rotate(360deg); }} }}</style>
</div>
""")] + [gr.Group(visible=False)] * len(group_components) + [None] * (len(output_containers) * 5)
results = get_predictions(audio_input, generate_audio, chunk_duration)
# Initialize outputs list
outputs = []
group_visibility = []
# Process each result
for i, result in enumerate(results):
if i < len(output_containers):
group_visibility.append(gr.Group(visible=True))
outputs.extend([
result['emotion'],
result['transcription'],
result['sentiment'],
result['image'],
result['music']
])
else:
# If we have more results than containers, just extend with None
group_visibility.append(gr.Group(visible=False))
outputs.extend([None] * 5)
# Hide remaining containers
for i in range(len(results), len(output_containers)):
group_visibility.append(gr.Group(visible=False))
outputs.extend([None] * 5)
# Hide loading indicator and show results
yield [gr.HTML("")] + group_visibility + outputs
# Function to handle example selection
def load_example(example_name):
if not example_name:
return None, None # Return None for both audio and example selector
# Get the path to the example audio file
example_path = load_example_audio(example_name)
# Return the example path to update the audio component
# The waveform will be automatically displayed by Gradio's Audio component
return example_path, example_name
# Set up the button click
process_btn.click(
fn=process_and_display,
inputs=[audio_input, generate_audio_checkbox, chunk_duration_input],
outputs=[loading_indicator] + group_components + [comp for container in output_containers for comp in [
container['emotion'],
container['transcription'],
container['sentiment'],
container['image'],
container['music']
]]
)
# Set up the clear button - Fixed output list
clear_btn.click(
fn=clear_all,
inputs=[],
outputs=[audio_input] + group_components + [comp for container in output_containers for comp in [
container['emotion'],
container['transcription'],
container['sentiment'],
container['image'],
container['music']
]] + [loading_indicator] + [chunk_duration_input] + [example_selector]
)
# Set up the example loading button
load_example_btn.click(
fn=load_example,
inputs=[example_selector],
outputs=[audio_input, example_selector]
)
interface.launch()