import gradio as gr import numpy as np import librosa import requests from io import BytesIO from PIL import Image import os import secrets import tempfile import base64 import math import struct import cv2 import shutil from tensorflow.keras.models import load_model from faster_whisper import WhisperModel from textblob import TextBlob import torch import scipy.io.wavfile from transformers import AutoProcessor, MusicgenForConditionalGeneration from pydub import AudioSegment from fastapi import FastAPI, Request from fastapi.responses import RedirectResponse, HTMLResponse from gradio.routes import mount_gradio_app # Intentar importar PyGithub, pero si no está, mostrar mensaje amigable try: from github import Github, GithubException GITHUB_AVAILABLE = True except ImportError: GITHUB_AVAILABLE = False print("⚠️ PyGithub no instalado. La publicación en GitHub no estará disponible.") # ============================================================ # 1. Carga de modelos de IA # ============================================================ def load_emotion_model(model_path): try: m = load_model(model_path) print("Emotion model loaded successfully") return m except Exception as e: print("Error loading emotion model:", e) return None model_path = "mymodel_SER_LSTM_RAVDESS.h5" model = load_emotion_model(model_path) # Whisper model_size = "small" whisper_model = WhisperModel(model_size, device="cpu", compute_type="int8") # MusicGen def load_musicgen_model(): try: device = "cuda" if torch.cuda.is_available() else "cpu" processor = AutoProcessor.from_pretrained("facebook/musicgen-small") music_model = MusicgenForConditionalGeneration.from_pretrained("facebook/musicgen-small") music_model.to(device) print("MusicGen model loaded successfully") return processor, music_model, device except Exception as e: print("Error loading MusicGen:", e) return None, None, None processor, music_model, device = load_musicgen_model() # DeepAI API key (opcional, para imágenes) DEEPAI_API_KEY = os.getenv("DeepAI_api_key") # ============================================================ # 2. Utilidades de audio y emociones # ============================================================ def chunk_audio(audio_path, chunk_duration=10): try: audio = AudioSegment.from_file(audio_path) duration_ms = len(audio) chunk_ms = chunk_duration * 1000 if chunk_duration <= 0: raise ValueError("Chunk duration must be positive") if chunk_duration > duration_ms / 1000: return [audio_path], 1 chunk_files = [] num_chunks = math.ceil(duration_ms / chunk_ms) for i in range(num_chunks): start_ms = i * chunk_ms end_ms = min((i + 1) * chunk_ms, duration_ms) chunk = audio[start_ms:end_ms] with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as tmp: chunk.export(tmp.name, format="wav") chunk_files.append(tmp.name) return chunk_files, num_chunks except Exception as e: print("Error chunking audio:", e) return [audio_path], 1 def transcribe(wav_filepath): try: segments, _ = whisper_model.transcribe(wav_filepath, beam_size=5) return "".join([seg.text for seg in segments]) except Exception as e: print("Error transcribing:", e) return "Transcription failed" def extract_mfcc(wav_file_name): try: y, sr = librosa.load(wav_file_name) mfccs = np.mean(librosa.feature.mfcc(y=y, sr=sr, n_mfcc=40).T, axis=0) return mfccs except Exception as e: print("Error extracting MFCC:", e) return None emotions = { 0: "neutral", 1: "calm", 2: "happy", 3: "sad", 4: "angry", 5: "fearful", 6: "disgust", 7: "surprised", } def predict_emotion_from_audio(wav_filepath): try: if model is None: return "Model not loaded" feats = extract_mfcc(wav_filepath) if feats is None: return "Feature extraction error" feats = np.reshape(feats, (1, 40, 1)) pred = model.predict(feats, verbose=0) label = np.argmax(pred[0]) return emotions.get(label, "unknown") except Exception as e: print("Emotion prediction error:", e) return "Prediction error" def analyze_sentiment(text): if not text or not text.strip(): return "neutral", 0.0 analysis = TextBlob(text) polarity = analysis.sentiment.polarity sentiment = "positive" if polarity > 0.1 else "negative" if polarity < -0.1 else "neutral" return sentiment, polarity # ============================================================ # 3. Prompts para imagen y música # ============================================================ def get_image_prompt(sentiment, text, chunk_idx, total_chunks): base = f"Generate an equirectangular 360° panoramic graphite sketch drawing, detailed pencil texture with faint neon glows, cinematic lighting of: {text[:200]}." if sentiment == "positive": return base + " Use bright, high contrast, rich colors, joyful atmosphere." elif sentiment == "negative": return base + " Use dark, low contrast, somber tones, melancholic atmosphere." else: return base + " Use balanced, neutral colors, calm atmosphere." def get_music_prompt(emotion, text, chunk_idx, total_chunks): prompts = { "neutral": f"Neutral ambient orchestral music, steady tempo, no strong emotions, inspired by: {text[:100]}", "calm": f"Calm, peaceful orchestral music, slow strings, soft dynamics, inspired by: {text[:100]}", "happy": f"Happy, uplifting orchestral music, major key, lively rhythm, inspired by: {text[:100]}", "sad": f"Sad, melancholic orchestral music, minor key, slow tempo, inspired by: {text[:100]}", "angry": f"Angry, aggressive orchestral music, dissonant, strong percussion, inspired by: {text[:100]}", "fearful": f"Fearful, tense orchestral music, unstable harmonies, suspenseful, inspired by: {text[:100]}", "disgust": f"Disgusted, harsh orchestral music, irregular rhythm, rough textures, inspired by: {text[:100]}", "surprised": f"Surprised, sudden changes, playful orchestral music, inspired by: {text[:100]}", } return prompts.get(emotion.lower(), f"Background music with {emotion} mood for: {text[:100]}") # ============================================================ # 4. Generación de imagen (DeepAI) y música (MusicGen) # ============================================================ def upscale_image(image, target_width=4096, target_height=2048): try: if not DEEPAI_API_KEY: img = image.resize((target_width, target_height), Image.Resampling.LANCZOS) return img with tempfile.NamedTemporaryFile(suffix=".jpg", delete=False) as tmp: image.save(tmp.name, "JPEG", quality=95) response = requests.post( "https://api.deepai.org/api/torch-srgan", files={"image": open(tmp.name, "rb")}, headers={"api-key": DEEPAI_API_KEY}, ) data = response.json() if "output_url" in data: img_resp = requests.get(data["output_url"]) up_img = Image.open(BytesIO(img_resp.content)) up_img = up_img.resize((target_width, target_height), Image.Resampling.LANCZOS) return up_img # fallback return image.resize((target_width, target_height), Image.Resampling.LANCZOS) except Exception as e: print("Upscale error:", e) return image.resize((target_width, target_height), Image.Resampling.LANCZOS) def generate_image(sentiment, text, chunk_idx, total_chunks): try: prompt = get_image_prompt(sentiment, text, chunk_idx, total_chunks) if DEEPAI_API_KEY: response = requests.post( "https://api.deepai.org/api/text2img", data={"text": prompt, "width": 1024, "height": 512, "image_generator_version": "hd"}, headers={"api-key": DEEPAI_API_KEY}, ) data = response.json() if "output_url" in data: img_resp = requests.get(data["output_url"]) img = Image.open(BytesIO(img_resp.content)) else: img = Image.new("RGB", (1024, 512), color="white") else: img = Image.new("RGB", (1024, 512), color="white") up_img = upscale_image(img) return up_img except Exception as e: print("Image generation error:", e) return Image.new("RGB", (4096, 2048), color="white") def generate_music(text, emotion, chunk_idx, total_chunks): try: if processor is None or music_model is None: return None prompt = get_music_prompt(emotion, text, chunk_idx, total_chunks) if len(prompt) > 200: prompt = prompt[:200] + "..." inputs = processor(text=[prompt], padding=True, return_tensors="pt").to(device) audio_values = music_model.generate(**inputs, max_new_tokens=512) sampling_rate = music_model.config.audio_encoder.sampling_rate audio_data = audio_values[0, 0].cpu().numpy() audio_data = audio_data / max(1e-9, np.max(np.abs(audio_data))) with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as tmp: scipy.io.wavfile.write(tmp.name, rate=sampling_rate, data=audio_data) return tmp.name except Exception as e: print("Music generation error:", e) return None # ============================================================ # 5. Metadatos 360° (XMP) # ============================================================ def create_xmp_block(width, height): return f''' ''' def write_xmp_to_jpg(input_path, output_path, width, height): with open(input_path, "rb") as f: data = f.read() if data[0:2] != b"\xFF\xD8": raise ValueError("Not a valid JPEG") xmp_data = create_xmp_block(width, height) app1_marker = b"\xFF\xE1" xmp_header = b"http://ns.adobe.com/xap/1.0/\x00" xmp_bytes = xmp_data.encode("utf-8") length = len(xmp_header) + len(xmp_bytes) + 2 length_bytes = struct.pack(">H", length) output = bytearray() output.extend(data[0:2]) output.extend(app1_marker) output.extend(length_bytes) output.extend(xmp_header) output.extend(xmp_bytes) output.extend(data[2:]) with open(output_path, "wb") as f: f.write(output) def add_360_metadata(img): try: target_width, target_height = 4096, 2048 if img.size != (target_width, target_height): img = img.resize((target_width, target_height), Image.Resampling.LANCZOS) with tempfile.NamedTemporaryFile(suffix=".jpg", delete=False) as tmp: img.save(tmp.name, "JPEG", quality=90) write_xmp_to_jpg(tmp.name, tmp.name, target_width, target_height) return tmp.name except Exception as e: print("Error adding 360 metadata:", e) with tempfile.NamedTemporaryFile(suffix=".jpg", delete=False) as tmp: img.save(tmp.name, "JPEG", quality=90) return tmp.name # ============================================================ # 6. Procesamiento de segmentos # ============================================================ def process_chunk(chunk_path, idx, total, gen_audio): try: emotion = predict_emotion_from_audio(chunk_path) text = transcribe(chunk_path) sentiment, _ = analyze_sentiment(text) img = generate_image(sentiment, text, idx, total) img_360_path = add_360_metadata(img) music_path = None if gen_audio: music_path = generate_music(text, emotion, idx, total) return { "emotion": emotion, "transcription": text, "sentiment": sentiment, "image": img, "image_360": img_360_path, "music": music_path, } except Exception as e: print(f"Chunk {idx+1} error:", e) return { "emotion": "Error", "transcription": "Transcription failed", "sentiment": "error", "image": Image.new("RGB", (4096, 2048), color="white"), "image_360": None, "music": None, } def get_predictions(audio_input, gen_audio, chunk_duration): chunk_files, total = chunk_audio(audio_input, chunk_duration) results = [] for i, cf in enumerate(chunk_files): print(f"Processing chunk {i+1}/{total}") res = process_chunk(cf, i, total, gen_audio) results.append(res) for cf in chunk_files: if cf != audio_input: try: os.unlink(cf) except: pass return results # ============================================================ # 7. Generación del visor HTML 360° # ============================================================ def create_360_viewer_html(image_paths, audio_paths, output_path): image_data_list = [] for img_path in image_paths: img = Image.open(img_path) img = img.resize((2048, 1024), Image.Resampling.LANCZOS) buf = BytesIO() img.save(buf, format="JPEG", quality=75, optimize=True) b64 = base64.b64encode(buf.getvalue()).decode("utf-8") image_data_list.append(f"data:image/jpeg;base64,{b64}") audio_base64_list = [] for ap in audio_paths: if ap and os.path.exists(ap): with open(ap, "rb") as f: audio_base64_list.append(base64.b64encode(f.read()).decode("utf-8")) else: audio_base64_list.append(None) html = f''' EVA 360 - Visualizador Inmersivo
Scene 1 / {len(image_data_list)}
''' with open(output_path, "w", encoding="utf-8") as f: f.write(html) return output_path # ============================================================ # 8. Publicación a GitHub (Gist) mediante OAuth # ============================================================ _pending_gists = {} # {token: html_file_path} def get_space_base_url(): # Hugging Face Spaces inyecta SPACE_HOST automáticamente host = os.getenv("SPACE_HOST") if host: return f"https://{host}" # Modo local return "http://localhost:7860" def trigger_github_oauth(html_file_path): if not GITHUB_AVAILABLE: return "❌ PyGithub no está instalado. Contacta al administrador." token = secrets.token_urlsafe(16) _pending_gists[token] = html_file_path client_id = os.getenv("GITHUB_CLIENT_ID") if not client_id: return "⚠️ GitHub OAuth no configurado (falta GITHUB_CLIENT_ID)." base_url = get_space_base_url() redirect_uri = f"{base_url}/github_callback" auth_url = f"https://github.com/login/oauth/authorize?client_id={client_id}&scope=gist&state={token}&redirect_uri={redirect_uri}" return auth_url # ============================================================ # 9. Aplicación FastAPI para el callback OAuth # ============================================================ fastapi_app = FastAPI() @fastapi_app.get("/oauth_redirect.html") async def serve_redirect_page(): return HTMLResponse(''' Autorizando...

Autorización completada

Esta ventana se cerrará automáticamente. Puedes volver a la aplicación.

''') @fastapi_app.get("/github_callback") async def github_callback(request: Request): code = request.query_params.get("code") state = request.query_params.get("state") if not code or not state: return RedirectResponse(url="/") if not GITHUB_AVAILABLE: return HTMLResponse("PyGithub no instalado.", status_code=500) client_id = os.getenv("GITHUB_CLIENT_ID") client_secret = os.getenv("GITHUB_CLIENT_SECRET") if not client_id or not client_secret: return HTMLResponse("Error: GitHub OAuth no configurado correctamente.", status_code=500) token_url = "https://github.com/login/oauth/access_token" payload = { "client_id": client_id, "client_secret": client_secret, "code": code, "state": state, } headers = {"Accept": "application/json"} resp = requests.post(token_url, data=payload, headers=headers) if resp.status_code != 200: return HTMLResponse(f"Error al obtener token: {resp.text}", status_code=500) token_data = resp.json() access_token = token_data.get("access_token") if not access_token: return HTMLResponse("No se recibió access_token", status_code=500) html_path = _pending_gists.pop(state, None) if not html_path or not os.path.exists(html_path): return HTMLResponse("El archivo HTML ya no está disponible. Vuelve a generar el entorno.", status_code=404) try: g = Github(access_token) user = g.get_user() with open(html_path, "r", encoding="utf-8") as f: content = f.read() gist = user.create_gist( public=True, description="Entorno Virtual Afectivo - EVA 360", files={f"eva_360_{secrets.token_hex(4)}.html": {"content": content}} ) gist_url = gist.html_url return HTMLResponse(f''' Publicado en GitHub

✅ Entorno publicado correctamente

Tu EVA 360 está disponible en: {gist_url}

Puedes cerrar esta ventana y volver a la aplicación.

''') except Exception as e: return HTMLResponse(f"Error al crear Gist: {str(e)}", status_code=500) # ============================================================ # 10. Interfaz Gradio # ============================================================ output_containers = [] group_components = [] def process_and_display(audio_input, generate_audio, chunk_duration): if chunk_duration is None or chunk_duration <= 0: chunk_duration = 10 yield ( [gr.HTML(f'''

Procesando audio en segmentos de {chunk_duration} segundos...

''')] + [gr.update(visible=False)] * len(group_components) + [None] * (len(output_containers) * 7) + [None, None, ""] ) results = get_predictions(audio_input, generate_audio, chunk_duration) outputs = [] group_vis = [] all_360_images = [] all_music_paths = [] for i, res in enumerate(results): if i < len(output_containers): group_vis.append(gr.update(visible=True)) outputs.extend([ res["emotion"], res["transcription"], res["sentiment"], res["image"], res["image_360"], res["music"], res["music"] ]) if res["image_360"]: all_360_images.append(res["image_360"]) if res["music"]: all_music_paths.append(res["music"]) else: group_vis.append(gr.update(visible=False)) outputs.extend([None]*7) for _ in range(len(results), len(output_containers)): group_vis.append(gr.update(visible=False)) outputs.extend([None]*7) viewer_html_path = None if all_360_images: with tempfile.NamedTemporaryFile(suffix=".html", delete=False) as tmp: viewer_html_path = create_360_viewer_html(all_360_images, all_music_paths, tmp.name) info_msg = "✅ Entorno generado. Puedes descargar el HTML y, si lo deseas, publicarlo en tu GitHub con el botón especial." yield [gr.HTML("")] + group_vis + outputs + [viewer_html_path, info_msg, ""] def clear_all(): return [None] + [gr.update(visible=False)] * len(group_components) + [None] * (len(output_containers)*7) + [gr.HTML(""), 10, None, None, ""] def publish_to_github(html_file_path): if not html_file_path or not os.path.exists(html_file_path): return "❌ No hay ningún entorno generado. Primero genera tu EVA." if not GITHUB_AVAILABLE: return "❌ PyGithub no está instalado. No se puede publicar." auth_url = trigger_github_oauth(html_file_path) if auth_url.startswith("http"): return f"🔐 **Para publicar en tu GitHub, [haz clic aquí para autorizar]({auth_url}).** Se abrirá una ventana emergente. Luego vuelve aquí." else: return auth_url # Construcción de la UI custom_css = """ .download-section { background: #f9f9f9; padding: 20px; border-radius: 15px; border: 1px solid #ddd; margin: 20px 0; } .download-button { background: #4CAF50 !important; color: white !important; } """ with gr.Blocks(css=custom_css, title="EVA 360 - Entornos Virtuales Afectivos") as interface: gr.Markdown("# Bello: Entornos Virtuales Afectivos") gr.Markdown(""" Sube o graba un audio y el sistema generará un entorno 360° con imágenes y música basada en la emoción y el contenido. """) with gr.Row(): with gr.Column(scale=2): audio_input = gr.Audio(label="Audio de entrada", type="filepath", sources=["microphone", "upload"]) with gr.Column(scale=1): chunk_duration = gr.Number(label="Duración del segmento (segundos)", value=10, minimum=1, maximum=60, step=1) gen_music = gr.Checkbox(label="Generar música (más lento)", value=False) with gr.Row(): process_btn = gr.Button("Generar EVA", variant="primary") clear_btn = gr.Button("Limpiar", variant="secondary") loading_indicator = gr.HTML("") for i in range(20): with gr.Group(visible=False) as grp: gr.Markdown(f"### Segmento {i+1}") with gr.Row(): emo = gr.Label(label="Emoción") trans = gr.Label(label="Transcripción") sent = gr.Label(label="Sentimiento") with gr.Row(): img_out = gr.Image(label="Imagen 360° generada") img_file = gr.File(label="Descargar imagen 360°", type="filepath") with gr.Row(): audio_out = gr.Audio(label="Música generada") audio_file = gr.File(label="Descargar música", type="filepath") gr.HTML("
") group_components.append(grp) output_containers.append({ "emotion": emo, "transcription": trans, "sentiment": sent, "image": img_out, "image_360": img_file, "music": audio_out, "music_file": audio_file }) with gr.Group(elem_classes="download-section"): html_download = gr.File(label="Descargar tu EVA (HTML)", type="filepath", interactive=False) publish_btn = gr.Button("📤 Publicar en mi GitHub (Gist)", variant="primary") publish_status = gr.Markdown("*Al hacer clic, se te redirigirá a GitHub para autorizar la publicación.*") js_output = gr.HTML(visible=False) process_btn.click( fn=process_and_display, inputs=[audio_input, gen_music, chunk_duration], outputs=[loading_indicator] + group_components + [comp for cont in output_containers for comp in [cont["emotion"], cont["transcription"], cont["sentiment"], cont["image"], cont["image_360"], cont["music"], cont["music_file"]]] + [html_download, publish_status, js_output] ) clear_btn.click( fn=clear_all, inputs=[], outputs=[audio_input] + group_components + [comp for cont in output_containers for comp in [cont["emotion"], cont["transcription"], cont["sentiment"], cont["image"], cont["image_360"], cont["music"], cont["music_file"]]] + [loading_indicator, chunk_duration, html_download, publish_status, js_output] ) publish_btn.click( fn=publish_to_github, inputs=[html_download], outputs=[publish_status] ) # ============================================================ # 11. Montar la app FastAPI + Gradio # ============================================================ app = mount_gradio_app(fastapi_app, interface, path="/") if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=7860)