"""
BELLO - Entornos Virtuales Afectivos (FULLY FIXED)
Working image generation + Enhanced 360° viewer with play/pause, continuous/random, and chunk dropdown
"""
import os
import math
import struct
import tempfile
import json
import base64
import warnings
import subprocess
import sys
# Suppress TensorFlow warnings
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '3'
warnings.filterwarnings('ignore', category=UserWarning)
warnings.filterwarnings('ignore', category=DeprecationWarning)
try:
from pydub import AudioSegment
except ImportError:
print("Installing pydub...")
subprocess.check_call([sys.executable, "-m", "pip", "install", "pydub"])
from pydub import AudioSegment
import numpy as np
import gradio as gr
from PIL import Image, ImageDraw, ImageFont
import cv2
# Check for ffmpeg
try:
subprocess.run(["ffmpeg", "-version"], capture_output=True, check=True)
except FileNotFoundError:
print("⚠️ FFmpeg not found. Installing...")
subprocess.check_call([sys.executable, "-m", "pip", "install", "ffmpeg-python"])
try:
import torch
except ImportError:
print("Installing torch...")
subprocess.check_call([sys.executable, "-m", "pip", "install", "torch"])
import torch
try:
import scipy.io.wavfile
except ImportError:
print("Installing scipy...")
subprocess.check_call([sys.executable, "-m", "pip", "install", "scipy"])
import scipy.io.wavfile
try:
import requests
except ImportError:
print("Installing requests...")
subprocess.check_call([sys.executable, "-m", "pip", "install", "requests"])
import requests
try:
from textblob import TextBlob
except ImportError:
print("Installing textblob...")
subprocess.check_call([sys.executable, "-m", "pip", "install", "textblob"])
from textblob import TextBlob
try:
import librosa
except ImportError:
print("Installing librosa...")
subprocess.check_call([sys.executable, "-m", "pip", "install", "librosa"])
import librosa
# =========================
# Model Loading with Fallbacks
# =========================
model = None
model2 = None
processor = None
music_model = None
device = None
def load_emotion_model(model_path):
"""Load emotion model with fallback"""
try:
from tensorflow.keras.models import load_model
if os.path.exists(model_path):
model = load_model(model_path)
print(f"✅ Emotion model loaded: {model_path}")
return model
else:
print(f"⚠️ Emotion model not found: {model_path}")
return None
except Exception as e:
print(f"⚠️ Error loading emotion model: {e}")
return None
model_path = "mymodel_SER_LSTM_RAVDESS.h5"
model = load_emotion_model(model_path)
# Try to load Whisper
try:
from faster_whisper import WhisperModel
print("📥 Loading Whisper model...")
model2 = WhisperModel("small", device="cpu", compute_type="int8")
print("✅ Whisper model loaded")
except Exception as e:
print(f"⚠️ Whisper model not available: {e}")
model2 = None
# Try to load MusicGen
def load_musicgen_model():
try:
print("📥 Loading MusicGen model...")
from transformers import AutoProcessor, MusicgenForConditionalGeneration
try:
device = "cuda" if torch.cuda.is_available() else "cpu"
print(f"Using device: {device}")
processor = AutoProcessor.from_pretrained("facebook/musicgen-small")
music_model = MusicgenForConditionalGeneration.from_pretrained("facebook/musicgen-small")
music_model.to(device)
# Set to eval mode to avoid gradient tracking
music_model.eval()
print("✅ MusicGen model loaded")
return processor, music_model, device
except Exception as load_error:
print(f"⚠️ Error during MusicGen loading: {load_error}")
return None, None, None
except ImportError as e:
print(f"⚠️ Transformers not installed: {e}")
return None, None, None
except Exception as e:
print(f"⚠️ MusicGen model not available: {e}")
return None, None, None
processor, music_model, device = load_musicgen_model()
# =========================
# Audio Processing
# =========================
def extract_mfcc(wav_filepath):
"""Extract MFCC features from audio"""
try:
y, sr = librosa.load(wav_filepath, sr=22050)
mfcc = librosa.feature.mfcc(y=y, sr=sr, n_mfcc=40)
mfcc = np.mean(mfcc.T, axis=0)
return mfcc
except Exception as e:
print(f"⚠️ MFCC extraction failed: {e}")
return None
def transcribe(audio_path):
"""Transcribe audio using Whisper"""
try:
if model2 is None:
return "[Transcripción no disponible]"
segments, info = model2.transcribe(audio_path, language="es")
text = "".join([segment.text for segment in segments])
return text if text else "[Sin habla detectada]"
except Exception as e:
print(f"⚠️ Transcription error: {e}")
return "[Error en transcripción]"
def chunk_audio_with_overlap(audio_path, chunk_duration=10, overlap_percent=20):
"""Split audio into chunks with overlap"""
try:
audio = AudioSegment.from_file(audio_path)
duration_ms = len(audio)
chunk_ms = chunk_duration * 1000
overlap_ms = int(chunk_ms * (overlap_percent / 100.0))
step_ms = chunk_ms - overlap_ms
if chunk_duration <= 0:
raise ValueError("Chunk duration must be positive")
if chunk_duration > duration_ms / 1000:
return [{"path": audio_path, "start_ms": 0, "end_ms": duration_ms, "original_index": 0, "overlap_ms": 0}], 1
chunk_files = []
num_chunks = math.ceil((duration_ms - overlap_ms) / step_ms) if step_ms > 0 else 1
for i in range(num_chunks):
start_ms = i * step_ms
end_ms = min(start_ms + chunk_ms, duration_ms)
if start_ms >= duration_ms:
break
chunk = audio[start_ms:end_ms]
with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as tmp_file:
chunk.export(tmp_file.name, format="wav")
chunk_files.append({
"path": tmp_file.name,
"start_ms": start_ms,
"end_ms": end_ms,
"original_index": i,
"overlap_ms": overlap_ms if i > 0 else 0
})
return chunk_files, len(chunk_files)
except Exception as e:
print(f"⚠️ Audio chunking error: {e}")
try:
audio_len = len(AudioSegment.from_file(audio_path))
return [{"path": audio_path, "start_ms": 0, "end_ms": audio_len, "original_index": 0, "overlap_ms": 0}], 1
except:
return [], 0
# =========================
# Emotion & Sentiment Analysis
# =========================
emotions = {
0: "neutral", 1: "calm", 2: "happy", 3: "sad",
4: "angry", 5: "fearful", 6: "disgust", 7: "surprised",
}
def predict_emotion_from_audio(wav_filepath):
"""Predict emotion from audio"""
try:
if model is None:
return "neutral"
test_point = extract_mfcc(wav_filepath)
if test_point is not None:
test_point = np.reshape(test_point, newshape=(1, 40, 1))
predictions = model.predict(test_point, verbose=0)
predicted_class = np.argmax(predictions[0])
return emotions.get(predicted_class, "neutral")
return "neutral"
except Exception as e:
print(f"⚠️ Emotion prediction error: {e}")
return "neutral"
def analyze_sentiment(text):
"""Analyze sentiment from text"""
try:
if not text or text.strip() == "" or text.startswith("["):
return "neutral", 0.0
analysis = TextBlob(text)
polarity = analysis.sentiment.polarity
sentiment = "positive" if polarity > 0.1 else "negative" if polarity < -0.1 else "neutral"
return sentiment, polarity
except Exception as e:
print(f"⚠️ Sentiment analysis error: {e}")
return "neutral", 0.0
# =========================
# Image Generation (Fixed - Placeholder based on Sentiment)
# =========================
def generate_image(sentiment_prediction, transcribed_text, chunk_idx, total_chunks):
"""
Generate a 360° panorama placeholder image based on sentiment.
Creates colored equirectangular image with text overlay.
"""
try:
print(f"🎨 Generating image for segment {chunk_idx + 1}, sentiment: {sentiment_prediction}")
# Color mapping based on sentiment
color_map = {
"positive": (100, 200, 100), # Green (RGB)
"negative": (200, 100, 100), # Red (RGB)
"neutral": (100, 150, 200), # Blue (RGB)
}
# Get color with fallback
color = color_map.get(sentiment_prediction.lower(), (100, 150, 200))
print(f" Using color: RGB{color}")
# Create equirectangular image (2:1 aspect ratio for 360° panorama)
print(f" Creating image: 1024x512")
image = Image.new("RGB", (1024, 512), color=color)
if image is None:
raise ValueError("Image creation returned None")
print(f" Image created successfully: {image.size}")
# Add text overlay
try:
from PIL import ImageDraw, ImageFont
draw = ImageDraw.Draw(image)
# Try to use a nice font, fallback to default
try:
font = ImageFont.truetype("/usr/share/fonts/truetype/dejavu/DejaVuSans.ttf", 20)
except:
font = ImageFont.load_default()
# Add sentiment text
text = f"Segment {chunk_idx + 1} - {sentiment_prediction.upper()}"
print(f" Adding text: {text}")
# Calculate center position
bbox = draw.textbbox((0, 0), text, font=font)
text_width = bbox[2] - bbox[0]
text_x = (1024 - text_width) // 2
text_y = 240
# Draw text with white color
draw.text((text_x, text_y), text, fill=(255, 255, 255), font=font)
print(f" Text added at position ({text_x}, {text_y})")
# Add transcription preview (first 60 chars)
if transcribed_text and not transcribed_text.startswith("["):
preview = transcribed_text[:60] + "..." if len(transcribed_text) > 60 else transcribed_text
preview_bbox = draw.textbbox((0, 0), preview, font=font)
preview_width = preview_bbox[2] - preview_bbox[0]
preview_x = (1024 - preview_width) // 2
preview_y = 280
draw.text((preview_x, preview_y), preview, fill=(200, 200, 200), font=font)
print(f" Preview added: {preview[:40]}...")
except Exception as text_error:
print(f"⚠️ Text overlay error (non-critical): {text_error}")
# Continue even if text fails
print(f"✅ Image generated successfully for segment {chunk_idx + 1}")
return image
except Exception as e:
print(f"❌ Image generation error: {e}")
import traceback
traceback.print_exc()
# Return fallback image
fallback = Image.new("RGB", (1024, 512), color=(100, 150, 200))
print(f" Using fallback image")
return fallback
def create_xmp_block(width, height):
"""Create XMP metadata block for 360° panorama"""
xmp = (
f'\n'
f'