""" BELLO - Entornos Virtuales Afectivos (FULLY FIXED) Working image generation + Enhanced 360° viewer with play/pause, continuous/random, and chunk dropdown """ import os import math import struct import tempfile import json import base64 import warnings import subprocess import sys # Suppress TensorFlow warnings os.environ['TF_CPP_MIN_LOG_LEVEL'] = '3' warnings.filterwarnings('ignore', category=UserWarning) warnings.filterwarnings('ignore', category=DeprecationWarning) try: from pydub import AudioSegment except ImportError: print("Installing pydub...") subprocess.check_call([sys.executable, "-m", "pip", "install", "pydub"]) from pydub import AudioSegment import numpy as np import gradio as gr from PIL import Image, ImageDraw, ImageFont import cv2 # Check for ffmpeg try: subprocess.run(["ffmpeg", "-version"], capture_output=True, check=True) except FileNotFoundError: print("⚠️ FFmpeg not found. Installing...") subprocess.check_call([sys.executable, "-m", "pip", "install", "ffmpeg-python"]) try: import torch except ImportError: print("Installing torch...") subprocess.check_call([sys.executable, "-m", "pip", "install", "torch"]) import torch try: import scipy.io.wavfile except ImportError: print("Installing scipy...") subprocess.check_call([sys.executable, "-m", "pip", "install", "scipy"]) import scipy.io.wavfile try: import requests except ImportError: print("Installing requests...") subprocess.check_call([sys.executable, "-m", "pip", "install", "requests"]) import requests try: from textblob import TextBlob except ImportError: print("Installing textblob...") subprocess.check_call([sys.executable, "-m", "pip", "install", "textblob"]) from textblob import TextBlob try: import librosa except ImportError: print("Installing librosa...") subprocess.check_call([sys.executable, "-m", "pip", "install", "librosa"]) import librosa # ========================= # Model Loading with Fallbacks # ========================= model = None model2 = None processor = None music_model = None device = None def load_emotion_model(model_path): """Load emotion model with fallback""" try: from tensorflow.keras.models import load_model if os.path.exists(model_path): model = load_model(model_path) print(f"✅ Emotion model loaded: {model_path}") return model else: print(f"⚠️ Emotion model not found: {model_path}") return None except Exception as e: print(f"⚠️ Error loading emotion model: {e}") return None model_path = "mymodel_SER_LSTM_RAVDESS.h5" model = load_emotion_model(model_path) # Try to load Whisper try: from faster_whisper import WhisperModel print("📥 Loading Whisper model...") model2 = WhisperModel("small", device="cpu", compute_type="int8") print("✅ Whisper model loaded") except Exception as e: print(f"⚠️ Whisper model not available: {e}") model2 = None # Try to load MusicGen def load_musicgen_model(): try: print("📥 Loading MusicGen model...") from transformers import AutoProcessor, MusicgenForConditionalGeneration try: device = "cuda" if torch.cuda.is_available() else "cpu" print(f"Using device: {device}") processor = AutoProcessor.from_pretrained("facebook/musicgen-small") music_model = MusicgenForConditionalGeneration.from_pretrained("facebook/musicgen-small") music_model.to(device) # Set to eval mode to avoid gradient tracking music_model.eval() print("✅ MusicGen model loaded") return processor, music_model, device except Exception as load_error: print(f"⚠️ Error during MusicGen loading: {load_error}") return None, None, None except ImportError as e: print(f"⚠️ Transformers not installed: {e}") return None, None, None except Exception as e: print(f"⚠️ MusicGen model not available: {e}") return None, None, None processor, music_model, device = load_musicgen_model() # ========================= # Audio Processing # ========================= def extract_mfcc(wav_filepath): """Extract MFCC features from audio""" try: y, sr = librosa.load(wav_filepath, sr=22050) mfcc = librosa.feature.mfcc(y=y, sr=sr, n_mfcc=40) mfcc = np.mean(mfcc.T, axis=0) return mfcc except Exception as e: print(f"⚠️ MFCC extraction failed: {e}") return None def transcribe(audio_path): """Transcribe audio using Whisper""" try: if model2 is None: return "[Transcripción no disponible]" segments, info = model2.transcribe(audio_path, language="es") text = "".join([segment.text for segment in segments]) return text if text else "[Sin habla detectada]" except Exception as e: print(f"⚠️ Transcription error: {e}") return "[Error en transcripción]" def chunk_audio_with_overlap(audio_path, chunk_duration=10, overlap_percent=20): """Split audio into chunks with overlap""" try: audio = AudioSegment.from_file(audio_path) duration_ms = len(audio) chunk_ms = chunk_duration * 1000 overlap_ms = int(chunk_ms * (overlap_percent / 100.0)) step_ms = chunk_ms - overlap_ms if chunk_duration <= 0: raise ValueError("Chunk duration must be positive") if chunk_duration > duration_ms / 1000: return [{"path": audio_path, "start_ms": 0, "end_ms": duration_ms, "original_index": 0, "overlap_ms": 0}], 1 chunk_files = [] num_chunks = math.ceil((duration_ms - overlap_ms) / step_ms) if step_ms > 0 else 1 for i in range(num_chunks): start_ms = i * step_ms end_ms = min(start_ms + chunk_ms, duration_ms) if start_ms >= duration_ms: break chunk = audio[start_ms:end_ms] with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as tmp_file: chunk.export(tmp_file.name, format="wav") chunk_files.append({ "path": tmp_file.name, "start_ms": start_ms, "end_ms": end_ms, "original_index": i, "overlap_ms": overlap_ms if i > 0 else 0 }) return chunk_files, len(chunk_files) except Exception as e: print(f"⚠️ Audio chunking error: {e}") try: audio_len = len(AudioSegment.from_file(audio_path)) return [{"path": audio_path, "start_ms": 0, "end_ms": audio_len, "original_index": 0, "overlap_ms": 0}], 1 except: return [], 0 # ========================= # Emotion & Sentiment Analysis # ========================= emotions = { 0: "neutral", 1: "calm", 2: "happy", 3: "sad", 4: "angry", 5: "fearful", 6: "disgust", 7: "surprised", } def predict_emotion_from_audio(wav_filepath): """Predict emotion from audio""" try: if model is None: return "neutral" test_point = extract_mfcc(wav_filepath) if test_point is not None: test_point = np.reshape(test_point, newshape=(1, 40, 1)) predictions = model.predict(test_point, verbose=0) predicted_class = np.argmax(predictions[0]) return emotions.get(predicted_class, "neutral") return "neutral" except Exception as e: print(f"⚠️ Emotion prediction error: {e}") return "neutral" def analyze_sentiment(text): """Analyze sentiment from text""" try: if not text or text.strip() == "" or text.startswith("["): return "neutral", 0.0 analysis = TextBlob(text) polarity = analysis.sentiment.polarity sentiment = "positive" if polarity > 0.1 else "negative" if polarity < -0.1 else "neutral" return sentiment, polarity except Exception as e: print(f"⚠️ Sentiment analysis error: {e}") return "neutral", 0.0 # ========================= # Image Generation (Fixed - Placeholder based on Sentiment) # ========================= def generate_image(sentiment_prediction, transcribed_text, chunk_idx, total_chunks): """ Generate a 360° panorama placeholder image based on sentiment. Creates colored equirectangular image with text overlay. """ try: print(f"🎨 Generating image for segment {chunk_idx + 1}, sentiment: {sentiment_prediction}") # Color mapping based on sentiment color_map = { "positive": (100, 200, 100), # Green (RGB) "negative": (200, 100, 100), # Red (RGB) "neutral": (100, 150, 200), # Blue (RGB) } # Get color with fallback color = color_map.get(sentiment_prediction.lower(), (100, 150, 200)) print(f" Using color: RGB{color}") # Create equirectangular image (2:1 aspect ratio for 360° panorama) print(f" Creating image: 1024x512") image = Image.new("RGB", (1024, 512), color=color) if image is None: raise ValueError("Image creation returned None") print(f" Image created successfully: {image.size}") # Add text overlay try: from PIL import ImageDraw, ImageFont draw = ImageDraw.Draw(image) # Try to use a nice font, fallback to default try: font = ImageFont.truetype("/usr/share/fonts/truetype/dejavu/DejaVuSans.ttf", 20) except: font = ImageFont.load_default() # Add sentiment text text = f"Segment {chunk_idx + 1} - {sentiment_prediction.upper()}" print(f" Adding text: {text}") # Calculate center position bbox = draw.textbbox((0, 0), text, font=font) text_width = bbox[2] - bbox[0] text_x = (1024 - text_width) // 2 text_y = 240 # Draw text with white color draw.text((text_x, text_y), text, fill=(255, 255, 255), font=font) print(f" Text added at position ({text_x}, {text_y})") # Add transcription preview (first 60 chars) if transcribed_text and not transcribed_text.startswith("["): preview = transcribed_text[:60] + "..." if len(transcribed_text) > 60 else transcribed_text preview_bbox = draw.textbbox((0, 0), preview, font=font) preview_width = preview_bbox[2] - preview_bbox[0] preview_x = (1024 - preview_width) // 2 preview_y = 280 draw.text((preview_x, preview_y), preview, fill=(200, 200, 200), font=font) print(f" Preview added: {preview[:40]}...") except Exception as text_error: print(f"⚠️ Text overlay error (non-critical): {text_error}") # Continue even if text fails print(f"✅ Image generated successfully for segment {chunk_idx + 1}") return image except Exception as e: print(f"❌ Image generation error: {e}") import traceback traceback.print_exc() # Return fallback image fallback = Image.new("RGB", (1024, 512), color=(100, 150, 200)) print(f" Using fallback image") return fallback def create_xmp_block(width, height): """Create XMP metadata block for 360° panorama""" xmp = ( f'\n' f'\n' f'\n' f'\n' f'\n' f'\n' f'' ) return xmp def write_xmp_to_jpg(input_path, output_path, width, height): """Inject XMP 360° metadata into JPEG""" try: with open(input_path, "rb") as f: data = f.read() if data[0:2] != b"\xFF\xD8": raise ValueError("Not a valid JPEG file") xmp_data = create_xmp_block(width, height) app1_marker = b"\xFF\xE1" xmp_header = b"http://ns.adobe.com/xap/1.0/\x00" xmp_bytes = xmp_data.encode("utf-8") length = len(xmp_header) + len(xmp_bytes) + 2 length_bytes = struct.pack(">H", length) output = bytearray() output.extend(data[0:2]) # SOI output.extend(app1_marker) output.extend(length_bytes) output.extend(xmp_header) output.extend(xmp_bytes) output.extend(data[2:]) with open(output_path, "wb") as f: f.write(output) except Exception as e: print(f"⚠️ XMP write error: {e}") def add_360_metadata(img): """Add 360° panorama metadata to image and save""" try: print(f" Saving image with metadata...") if img is None: raise ValueError("Input image is None") # Save to temporary file first with tempfile.NamedTemporaryFile(suffix=".jpg", delete=False) as tmp_file: tmp_path = tmp_file.name print(f" Temp file: {tmp_path}") # Ensure image is in RGB mode if img.mode != 'RGB': print(f" Converting from {img.mode} to RGB") img = img.convert('RGB') # Save as JPEG img.save(tmp_path, "JPEG", quality=95) print(f" JPEG saved: {tmp_path}") # Inject XMP metadata try: write_xmp_to_jpg(tmp_path, tmp_path, img.width, img.height) print(f" XMP metadata injected") except Exception as xmp_error: print(f" ⚠️ XMP injection failed (non-critical): {xmp_error}") # Continue even if XMP fails print(f"✅ Image saved: {tmp_path}") return tmp_path except Exception as e: print(f"❌ Metadata error: {e}") import traceback traceback.print_exc() # Fallback: save without metadata try: with tempfile.NamedTemporaryFile(suffix=".jpg", delete=False) as tmp_file: tmp_path = tmp_file.name if img.mode != 'RGB': img = img.convert('RGB') img.save(tmp_path, "JPEG", quality=95) print(f"⚠️ Saved without metadata: {tmp_path}") return tmp_path except Exception as fallback_error: print(f"❌ Fallback also failed: {fallback_error}") return None # ========================= # Music Generation # ========================= def generate_music(transcribed_text, emotion_prediction, chunk_idx, total_chunks): """Generate music using MusicGen (if available)""" try: if processor is None or music_model is None: print("⚠️ MusicGen not available, skipping music generation") return None emotion_prompts = { "calm": "calm relaxing ambient music", "happy": "uplifting energetic joyful music", "sad": "melancholic emotional sad music", "angry": "intense dramatic aggressive music", "fearful": "eerie suspenseful music", "disgust": "dark unsettling music", "surprised": "playful expressive music", "neutral": "ambient peaceful background music" } prompt = emotion_prompts.get(emotion_prediction.lower(), "ambient background music") try: with torch.no_grad(): inputs = processor(text=[prompt], padding=True, return_tensors="pt").to(device) audio_values = music_model.generate(**inputs, max_new_tokens=256) sampling_rate = music_model.config.audio_encoder.sampling_rate audio_data = audio_values[0, 0].cpu().numpy() audio_data = audio_data / max(1e-9, np.max(np.abs(audio_data))) with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as tmp_file: scipy.io.wavfile.write(tmp_file.name, rate=sampling_rate, data=audio_data) print(f"✅ Music generated for segment {chunk_idx + 1}") return tmp_file.name except RuntimeError as runtime_error: print(f"⚠️ Music generation runtime error: {runtime_error}") print(" Skipping music for this segment") return None except Exception as e: print(f"⚠️ Music generation error: {e}") return None # ========================= # Chunk Processing # ========================= def process_chunk(chunk_info, generate_audio=True): """Process a single audio chunk""" try: chunk_path = chunk_info["path"] chunk_idx = chunk_info["original_index"] print(f"\n📋 ====== Processing Chunk {chunk_idx + 1} ======") print(f"🎯 Step 1: Emotion prediction...") emotion_prediction = predict_emotion_from_audio(chunk_path) print(f" ✓ Emotion: {emotion_prediction}") print(f"🎯 Step 2: Transcription...") transcribed_text = transcribe(chunk_path) print(f" ✓ Text: {transcribed_text[:60]}..." if len(transcribed_text) > 60 else f" ✓ Text: {transcribed_text}") print(f"🎯 Step 3: Sentiment analysis...") sentiment, polarity = analyze_sentiment(transcribed_text) print(f" ✓ Sentiment: {sentiment} (polarity: {polarity:.2f})") print(f"🎯 Step 4: Image generation...") image = generate_image(sentiment, transcribed_text, chunk_idx, -1) if image is None: raise ValueError("Image generation returned None") print(f" ✓ Image: {image.size} {image.mode}") print(f"🎯 Step 5: Adding 360° metadata...") image_360_path = add_360_metadata(image) print(f" ✓ Path: {image_360_path}") music_path = None if generate_audio: print(f"🎯 Step 6: Music generation...") music_path = generate_music(transcribed_text, emotion_prediction, chunk_idx, -1) if music_path: print(f" ✓ Music: {music_path}") else: print(f" ⚠️ Music skipped") result = { "chunk_index": chunk_idx + 1, "emotion": emotion_prediction, "transcription": transcribed_text[:100] if transcribed_text else "N/A", "sentiment": sentiment, "image": image, "image_360": image_360_path, "music": music_path, } print(f"✅ Chunk {chunk_idx + 1} processed successfully") return result except Exception as e: print(f"\n❌ ERROR processing chunk {chunk_idx + 1}: {e}") import traceback traceback.print_exc() return { "chunk_index": chunk_idx + 1, "emotion": "error", "transcription": str(e), "sentiment": "error", "image": Image.new("RGB", (1024, 512), color=(100, 100, 100)), "image_360": None, "music": None, } def get_predictions(audio_input, generate_audio=True, chunk_duration=10): """Process all chunks""" try: chunk_infos, total_chunks = chunk_audio_with_overlap(audio_input, chunk_duration, overlap_percent=20) results = [] for i, chunk_info in enumerate(chunk_infos): print(f"⏳ Processing chunk {i+1}/{total_chunks}") result = process_chunk(chunk_info, generate_audio) results.append(result) # Cleanup temp files for chunk_info in chunk_infos: try: if chunk_info["path"] != audio_input: os.unlink(chunk_info["path"]) except: pass return results except Exception as e: print(f"⚠️ Prediction error: {e}") return [] # ========================= # 360 Viewer - ENHANCED # ========================= def create_360_viewer_html(image_paths, audio_paths, output_path): """Create enhanced 360 viewer with play/pause, continuous/random, chunk dropdown""" image_data_list = [] for img_path in image_paths: try: with open(img_path, "rb") as f: img_data = base64.b64encode(f.read()).decode("utf-8") image_data_list.append(f"data:image/jpeg;base64,{img_data}") except Exception as e: print(f"⚠️ Image encoding error: {e}") image_data_list.append(None) audio_data_list = [] for audio_path in audio_paths: if audio_path: try: with open(audio_path, "rb") as f: audio_data = base64.b64encode(f.read()).decode("utf-8") audio_data_list.append(f"data:audio/wav;base64,{audio_data}") except Exception as e: print(f"⚠️ Audio encoding error: {e}") audio_data_list.append(None) else: audio_data_list.append(None) html_content = f""" EVA 360 - Visualizador Afectivo
Parado
""" with open(output_path, "w") as f: f.write(html_content) return output_path # ========================= # Gradio Interface # ========================= def process_audio(audio_input, generate_audio, chunk_duration): """Main processing function""" if not audio_input: return "❌ Por favor carga un archivo de audio", [], None # Validate and fix chunk duration if chunk_duration is None: chunk_duration = 10 else: try: chunk_duration = float(chunk_duration) if chunk_duration < 1: chunk_duration = 10 print(f"⚠️ Chunk duration too small, using default: 10s") elif chunk_duration > 120: chunk_duration = 120 print(f"⚠️ Chunk duration too large, using maximum: 120s") except (ValueError, TypeError): chunk_duration = 10 print(f"⚠️ Invalid chunk duration, using default: 10s") print(f"\n🚀 Starting processing with {chunk_duration}s chunks...") results = get_predictions(audio_input, generate_audio, chunk_duration) if not results: return "❌ Error al procesar el audio", [], None # Build output markdown output_md = f"## ✅ Procesamiento Completado\n\n" output_md += f"**Total de segmentos:** {len(results)}\n\n" images_list = [] for i, result in enumerate(results): output_md += f"### 📊 Segmento {i+1}\n" output_md += f"- **Emoción:** {result['emotion']}\n" output_md += f"- **Transcripción:** {result['transcription']}\n" output_md += f"- **Sentimiento:** {result['sentiment']}\n\n" if result['image']: images_list.append((result['image'], f"Seg {i+1}")) # Create viewer viewer_path = None image_paths = [r['image_360'] for r in results if r['image_360']] audio_paths = [r['music'] for r in results] if image_paths: try: with tempfile.NamedTemporaryFile(suffix=".html", delete=False, mode='w') as tmp: viewer_path = create_360_viewer_html(image_paths, audio_paths, tmp.name) print(f"✅ Viewer created: {viewer_path}") except Exception as e: print(f"⚠️ Error creating viewer: {e}") viewer_path = None return output_md, images_list, viewer_path # Create interface with gr.Blocks(title="BELLO Seamless Enhanced") as demo: gr.Markdown("# 🌀 BELLO Seamless - Entornos Virtuales Afectivos") gr.Markdown(""" **BELLO Seamless** explora las emociones a través de la voz y crea experiencias inmersivas en 360°. ### ¿Cómo usar? 1. 🎤 Carga un archivo de audio 2. ⚙️ Ajusta la duración de segmentos (5-60 segundos) 3. 🎵 Marca para generar música (opcional, más lento) 4. 🚀 Presiona "Generar EVA" 5. 📥 Descarga y abre el visualizador HTML en tu navegador 6. ▶️ Usa los controles: Play/Pause, Continuo/Aleatorio, Dropdown de segmentos """) with gr.Row(): with gr.Column(scale=2): audio = gr.Audio(label="🎤 Audio", type="filepath", sources=["upload", "microphone"]) with gr.Column(scale=1): duration = gr.Number(label="⏱ Duración Segmento (s)", value=10, minimum=2, maximum=120, step=1) gen_music = gr.Checkbox(label="🎵 Generar Música", value=False) with gr.Row(): process_btn = gr.Button("🚀 Generar EVA", variant="primary") clear_btn = gr.Button("🗑 Limpiar", variant="secondary") with gr.Tabs(): with gr.TabItem("📊 Resultados"): output_text = gr.Markdown(value="*Resultados aparecerán aquí*") gallery = gr.Gallery(label="🖼 Imágenes", columns=2, rows=2) with gr.TabItem("🎬 Visualizador 360°"): gr.Markdown("### 📥 Descarga tu EVA aquí") gr.Markdown("Una vez que el procesamiento esté completo, haz clic en el botón de descarga para obtener el visualizador HTML interactivo.") viewer = gr.File(label="📥 Descargar Visualizador 360°", type="filepath") def clear(): return None, "❌ Borrado", [], None process_btn.click( fn=process_audio, inputs=[audio, gen_music, duration], outputs=[output_text, gallery, viewer] ) clear_btn.click( fn=clear, outputs=[audio, output_text, gallery, viewer] ) if __name__ == "__main__": print("\n" + "="*60) print("🌀 BELLO Seamless Enhanced - Iniciando...") print("="*60 + "\n") demo.launch( server_name="0.0.0.0", server_port=7860, share=False, show_error=True )