import gradio as gr import numpy as np import librosa import requests from io import BytesIO from PIL import Image import os import secrets import tempfile import base64 import math import struct import cv2 import shutil from tensorflow.keras.models import load_model from faster_whisper import WhisperModel from textblob import TextBlob import torch import scipy.io.wavfile from transformers import AutoProcessor, MusicgenForConditionalGeneration from pydub import AudioSegment from fastapi import FastAPI, Request from fastapi.responses import RedirectResponse, HTMLResponse from gradio.routes import mount_gradio_app try: from github import Github, GithubException GITHUB_AVAILABLE = True except ImportError: GITHUB_AVAILABLE = False print("⚠️ PyGithub no instalado. La publicación en GitHub no estará disponible.") # ============================================================ # 1. Carga de modelos de IA # ============================================================ def load_emotion_model(model_path): try: m = load_model(model_path) print("Emotion model loaded successfully") return m except Exception as e: print("Error loading emotion model:", e) return None model_path = "mymodel_SER_LSTM_RAVDESS.h5" model = load_emotion_model(model_path) model_size = "small" whisper_model = WhisperModel(model_size, device="cpu", compute_type="int8") def load_musicgen_model(): try: device = "cuda" if torch.cuda.is_available() else "cpu" processor = AutoProcessor.from_pretrained("facebook/musicgen-small") music_model = MusicgenForConditionalGeneration.from_pretrained("facebook/musicgen-small") music_model.to(device) print("MusicGen model loaded successfully") return processor, music_model, device except Exception as e: print("Error loading MusicGen:", e) return None, None, None processor, music_model, device = load_musicgen_model() DEEPAI_API_KEY = os.getenv("DeepAI_api_key") # ============================================================ # 2. Utilidades de audio y emociones # ============================================================ def chunk_audio(audio_path, chunk_duration=10): try: audio = AudioSegment.from_file(audio_path) duration_ms = len(audio) chunk_ms = chunk_duration * 1000 if chunk_duration <= 0: raise ValueError("Chunk duration must be positive") if chunk_duration > duration_ms / 1000: return [audio_path], 1 chunk_files = [] num_chunks = math.ceil(duration_ms / chunk_ms) for i in range(num_chunks): start_ms = i * chunk_ms end_ms = min((i + 1) * chunk_ms, duration_ms) chunk = audio[start_ms:end_ms] with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as tmp: chunk.export(tmp.name, format="wav") chunk_files.append(tmp.name) return chunk_files, num_chunks except Exception as e: print("Error chunking audio:", e) return [audio_path], 1 def transcribe(wav_filepath): try: segments, _ = whisper_model.transcribe(wav_filepath, beam_size=5) return "".join([seg.text for seg in segments]) except Exception as e: print("Error transcribing:", e) return "Transcription failed" def extract_mfcc(wav_file_name): try: y, sr = librosa.load(wav_file_name) mfccs = np.mean(librosa.feature.mfcc(y=y, sr=sr, n_mfcc=40).T, axis=0) return mfccs except Exception as e: print("Error extracting MFCC:", e) return None emotions = { 0: "neutral", 1: "calm", 2: "happy", 3: "sad", 4: "angry", 5: "fearful", 6: "disgust", 7: "surprised", } def predict_emotion_from_audio(wav_filepath): try: if model is None: return "Model not loaded" feats = extract_mfcc(wav_filepath) if feats is None: return "Feature extraction error" feats = np.reshape(feats, (1, 40, 1)) pred = model.predict(feats, verbose=0) label = np.argmax(pred[0]) return emotions.get(label, "unknown") except Exception as e: print("Emotion prediction error:", e) return "Prediction error" def analyze_sentiment(text): if not text or not text.strip(): return "neutral", 0.0 analysis = TextBlob(text) polarity = analysis.sentiment.polarity sentiment = "positive" if polarity > 0.1 else "negative" if polarity < -0.1 else "neutral" return sentiment, polarity # ============================================================ # 3. Prompts para imagen y música # ============================================================ def get_image_prompt(sentiment, text, chunk_idx, total_chunks): base = f"Generate an equirectangular 360° panoramic graphite sketch drawing, detailed pencil texture with faint neon glows, cinematic lighting of: {text[:200]}." if sentiment == "positive": return base + " Use bright, high contrast, rich colors, joyful atmosphere." elif sentiment == "negative": return base + " Use dark, low contrast, somber tones, melancholic atmosphere." else: return base + " Use balanced, neutral colors, calm atmosphere." def get_music_prompt(emotion, text, chunk_idx, total_chunks): prompts = { "neutral": f"Neutral ambient orchestral music, steady tempo, no strong emotions, inspired by: {text[:100]}", "calm": f"Calm, peaceful orchestral music, slow strings, soft dynamics, inspired by: {text[:100]}", "happy": f"Happy, uplifting orchestral music, major key, lively rhythm, inspired by: {text[:100]}", "sad": f"Sad, melancholic orchestral music, minor key, slow tempo, inspired by: {text[:100]}", "angry": f"Angry, aggressive orchestral music, dissonant, strong percussion, inspired by: {text[:100]}", "fearful": f"Fearful, tense orchestral music, unstable harmonies, suspenseful, inspired by: {text[:100]}", "disgust": f"Disgusted, harsh orchestral music, irregular rhythm, rough textures, inspired by: {text[:100]}", "surprised": f"Surprised, sudden changes, playful orchestral music, inspired by: {text[:100]}", } return prompts.get(emotion.lower(), f"Background music with {emotion} mood for: {text[:100]}") # ============================================================ # 4. Generación de imagen (DeepAI) y música (MusicGen) # ============================================================ def upscale_image(image, target_width=4096, target_height=2048): try: if not DEEPAI_API_KEY: img = image.resize((target_width, target_height), Image.Resampling.LANCZOS) return img with tempfile.NamedTemporaryFile(suffix=".jpg", delete=False) as tmp: image.save(tmp.name, "JPEG", quality=95) response = requests.post( "https://api.deepai.org/api/torch-srgan", files={"image": open(tmp.name, "rb")}, headers={"api-key": DEEPAI_API_KEY}, ) data = response.json() if "output_url" in data: img_resp = requests.get(data["output_url"]) up_img = Image.open(BytesIO(img_resp.content)) up_img = up_img.resize((target_width, target_height), Image.Resampling.LANCZOS) return up_img return image.resize((target_width, target_height), Image.Resampling.LANCZOS) except Exception as e: print("Upscale error:", e) return image.resize((target_width, target_height), Image.Resampling.LANCZOS) def generate_image(sentiment, text, chunk_idx, total_chunks): try: prompt = get_image_prompt(sentiment, text, chunk_idx, total_chunks) if DEEPAI_API_KEY: response = requests.post( "https://api.deepai.org/api/text2img", data={"text": prompt, "width": 1024, "height": 512, "image_generator_version": "hd"}, headers={"api-key": DEEPAI_API_KEY}, ) data = response.json() if "output_url" in data: img_resp = requests.get(data["output_url"]) img = Image.open(BytesIO(img_resp.content)) else: img = Image.new("RGB", (1024, 512), color="white") else: img = Image.new("RGB", (1024, 512), color="white") up_img = upscale_image(img) return up_img except Exception as e: print("Image generation error:", e) return Image.new("RGB", (4096, 2048), color="white") def generate_music(text, emotion, chunk_idx, total_chunks): try: if processor is None or music_model is None: return None prompt = get_music_prompt(emotion, text, chunk_idx, total_chunks) if len(prompt) > 200: prompt = prompt[:200] + "..." inputs = processor(text=[prompt], padding=True, return_tensors="pt").to(device) audio_values = music_model.generate(**inputs, max_new_tokens=512) sampling_rate = music_model.config.audio_encoder.sampling_rate audio_data = audio_values[0, 0].cpu().numpy() audio_data = audio_data / max(1e-9, np.max(np.abs(audio_data))) with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as tmp: scipy.io.wavfile.write(tmp.name, rate=sampling_rate, data=audio_data) return tmp.name except Exception as e: print("Music generation error:", e) return None # ============================================================ # 5. Metadatos 360° (XMP) # ============================================================ def create_xmp_block(width, height): return f''' ''' def write_xmp_to_jpg(input_path, output_path, width, height): with open(input_path, "rb") as f: data = f.read() if data[0:2] != b"\xFF\xD8": raise ValueError("Not a valid JPEG") xmp_data = create_xmp_block(width, height) app1_marker = b"\xFF\xE1" xmp_header = b"http://ns.adobe.com/xap/1.0/\x00" xmp_bytes = xmp_data.encode("utf-8") length = len(xmp_header) + len(xmp_bytes) + 2 length_bytes = struct.pack(">H", length) output = bytearray() output.extend(data[0:2]) output.extend(app1_marker) output.extend(length_bytes) output.extend(xmp_header) output.extend(xmp_bytes) output.extend(data[2:]) with open(output_path, "wb") as f: f.write(output) def add_360_metadata(img): try: target_width, target_height = 4096, 2048 if img.size != (target_width, target_height): img = img.resize((target_width, target_height), Image.Resampling.LANCZOS) with tempfile.NamedTemporaryFile(suffix=".jpg", delete=False) as tmp: img.save(tmp.name, "JPEG", quality=90) write_xmp_to_jpg(tmp.name, tmp.name, target_width, target_height) return tmp.name except Exception as e: print("Error adding 360 metadata:", e) with tempfile.NamedTemporaryFile(suffix=".jpg", delete=False) as tmp: img.save(tmp.name, "JPEG", quality=90) return tmp.name # ============================================================ # 6. Procesamiento de segmentos # ============================================================ def process_chunk(chunk_path, idx, total, gen_audio): try: emotion = predict_emotion_from_audio(chunk_path) text = transcribe(chunk_path) sentiment, _ = analyze_sentiment(text) img = generate_image(sentiment, text, idx, total) img_360_path = add_360_metadata(img) music_path = None if gen_audio: music_path = generate_music(text, emotion, idx, total) return { "emotion": emotion, "transcription": text, "sentiment": sentiment, "image": img, "image_360": img_360_path, "music": music_path, } except Exception as e: print(f"Chunk {idx+1} error:", e) return { "emotion": "Error", "transcription": "Transcription failed", "sentiment": "error", "image": Image.new("RGB", (4096, 2048), color="white"), "image_360": None, "music": None, } def get_predictions(audio_input, gen_audio, chunk_duration): chunk_files, total = chunk_audio(audio_input, chunk_duration) results = [] for i, cf in enumerate(chunk_files): print(f"Processing chunk {i+1}/{total}") res = process_chunk(cf, i, total, gen_audio) results.append(res) for cf in chunk_files: if cf != audio_input: try: os.unlink(cf) except: pass return results # ============================================================ # 7. Generación del visor HTML 360° (avión rojo + transiciones) # ============================================================ def create_360_viewer_html(image_paths, audio_paths, output_path): image_data_list = [] for img_path in image_paths: try: img = Image.open(img_path) img = img.resize((2048, 1024), Image.Resampling.LANCZOS) buf = BytesIO() img.save(buf, format="JPEG", quality=75, optimize=True) b64 = base64.b64encode(buf.getvalue()).decode("utf-8") image_data_list.append(f"data:image/jpeg;base64,{b64}") except Exception as e: blank = Image.new("RGB", (2048, 1024), color="white") buf = BytesIO() blank.save(buf, format="JPEG", quality=75) b64 = base64.b64encode(buf.getvalue()).decode("utf-8") image_data_list.append(f"data:image/jpeg;base64,{b64}") audio_data_list = [] for ap in audio_paths: if ap and os.path.exists(ap): with open(ap, "rb") as f: audio_data_list.append(base64.b64encode(f.read()).decode("utf-8")) else: audio_data_list.append(None) html = f''' EVA 360 - Avión Rojo

EVA 360 – VUELO INMERSIVO

Cargando {len(image_data_list)} panoramas...

''' with open(output_path, "w", encoding="utf-8") as f: f.write(html) return output_path # ============================================================ # 8. Publicación a GitHub OAuth (FastAPI) # ============================================================ _pending_gists = {} def get_space_base_url(): host = os.getenv("SPACE_HOST") return f"https://{host}" if host else "http://localhost:7860" def trigger_github_oauth(html_file_path): if not GITHUB_AVAILABLE: return "❌ PyGithub no instalado." token = secrets.token_urlsafe(16) _pending_gists[token] = html_file_path client_id = os.getenv("GITHUB_CLIENT_ID") if not client_id: return "⚠️ GitHub OAuth no configurado." base_url = get_space_base_url() redirect_uri = f"{base_url}/github_callback" return f"https://github.com/login/oauth/authorize?client_id={client_id}&scope=gist&state={token}&redirect_uri={redirect_uri}" fastapi_app = FastAPI() @fastapi_app.get("/oauth_redirect.html") async def serve_redirect_page(): return HTMLResponse('

Autorización completada

') @fastapi_app.get("/github_callback") async def github_callback(request: Request): code = request.query_params.get("code") state = request.query_params.get("state") if not code or not state: return RedirectResponse(url="/") client_id = os.getenv("GITHUB_CLIENT_ID") client_secret = os.getenv("GITHUB_CLIENT_SECRET") if not client_id or not client_secret: return HTMLResponse("Error: OAuth no configurado.", status_code=500) resp = requests.post("https://github.com/login/oauth/access_token", data={"client_id": client_id, "client_secret": client_secret, "code": code, "state": state}, headers={"Accept": "application/json"}) if resp.status_code != 200: return HTMLResponse("Error al obtener token", status_code=500) access_token = resp.json().get("access_token") if not access_token: return HTMLResponse("No se recibió access_token", status_code=500) html_path = _pending_gists.pop(state, None) if not html_path or not os.path.exists(html_path): return HTMLResponse("El archivo HTML ya no está disponible.", status_code=404) try: g = Github(access_token) user = g.get_user() with open(html_path, "r", encoding="utf-8") as f: content = f.read() gist = user.create_gist(public=True, description="EVA 360 - Avión Rojo", files={f"eva_360_{secrets.token_hex(4)}.html": {"content": content}}) return HTMLResponse(f'

✅ Publicado

{gist.html_url}

') except Exception as e: return HTMLResponse(f"Error: {str(e)}", status_code=500) # ============================================================ # 9. Interfaz Gradio (definitiva, sin iconos gigantes) # ============================================================ theme = gr.themes.Soft(primary_hue="red", secondary_hue="orange", font=gr.themes.GoogleFont("Inter")) custom_css = """ .download-section { background: #f8f9fa; padding: 15px; border-radius: 12px; margin: 15px 0; } audio { max-height: 50px !important; width: 100% !important; } .file-download button, .file-download a { font-size: 12px !important; padding: 4px 8px !important; } .gr-box { border-radius: 12px; } .loader { border: 3px solid #f3f3f3; border-top: 3px solid #f53737; border-radius: 50%; width: 40px; height: 40px; animation: spin 1s linear infinite; margin: 20px auto; } @keyframes spin { 0% { transform: rotate(0deg); } 100% { transform: rotate(360deg); } } """ MAX_SEGMENTS = 10 output_containers = [] group_components = [] def process_and_display(audio_input, generate_audio, chunk_duration): if chunk_duration is None or chunk_duration <= 0: chunk_duration = 10 yield ([gr.update(visible=True)] + [gr.update(visible=False)] * len(group_components) + [None] * (MAX_SEGMENTS * 7) + [None, None, ""]) results = get_predictions(audio_input, generate_audio, chunk_duration) outputs = [] group_vis = [] all_360_images = [] all_music_paths = [] for i in range(MAX_SEGMENTS): if i < len(results): res = results[i] group_vis.append(gr.update(visible=True)) outputs.extend([res["emotion"], res["transcription"], res["sentiment"], res["image"], res["image_360"], res["music"], res["music"]]) if res["image_360"]: all_360_images.append(res["image_360"]) if res["music"]: all_music_paths.append(res["music"]) else: group_vis.append(gr.update(visible=False)) outputs.extend([None]*7) viewer_html_path = None if all_360_images: with tempfile.NamedTemporaryFile(suffix=".html", delete=False) as tmp: viewer_html_path = create_360_viewer_html(all_360_images, all_music_paths, tmp.name) info_msg = f"✅ Entorno generado con {len(results)} segmentos." yield ([gr.update(visible=False)] + group_vis + outputs + [viewer_html_path, info_msg, ""]) def clear_all(): return ([None] + [gr.update(visible=False)] * len(group_components) + [None] * (MAX_SEGMENTS * 7) + [gr.update(visible=False), 10, None, None, ""]) def publish_to_github(html_file_path): if not html_file_path or not os.path.exists(html_file_path): return "❌ No hay ningún entorno generado." if not GITHUB_AVAILABLE: return "❌ PyGithub no instalado." auth_url = trigger_github_oauth(html_file_path) if auth_url.startswith("http"): return f"🔐 [Autorizar en GitHub]({auth_url}) (se abrirá una ventana)" return auth_url with gr.Blocks(theme=theme, css=custom_css, title="EVA 360 - Avión Rojo") as interface: gr.Markdown("# ✈️ EVA 360: Entornos Virtuales Afectivos con Avión Rojo") with gr.Row(): with gr.Column(scale=2): audio_input = gr.Audio(label="🎤 Audio de entrada", type="filepath", sources=["microphone", "upload"]) with gr.Column(scale=1): chunk_duration = gr.Slider(label="⏱️ Segmento (segundos)", value=10, minimum=3, maximum=30, step=1) gen_music = gr.Checkbox(label="🎵 Generar música", value=False) with gr.Row(): process_btn = gr.Button("🚀 Generar EVA", variant="primary") clear_btn = gr.Button("🗑️ Limpiar", variant="secondary") loading_indicator = gr.HTML('

Procesando...

', visible=False) with gr.Tabs(): for i in range(MAX_SEGMENTS): with gr.TabItem(f"Segmento {i+1}", visible=False) as tab: with gr.Group(): with gr.Row(): emo = gr.Label(label="😌 Emoción") sent = gr.Label(label="💭 Sentimiento") trans = gr.Textbox(label="📝 Transcripción", lines=2) with gr.Row(): img_out = gr.Image(label="🖼️ Panorama 360°", height=240) img_file = gr.File(label="📁 Descargar imagen") with gr.Row(): audio_out = gr.Audio(label="🎶 Música") audio_file = gr.File(label="📁 Descargar música") group_components.append(tab) output_containers.append({"emotion": emo, "transcription": trans, "sentiment": sent, "image": img_out, "image_360": img_file, "music": audio_out, "music_file": audio_file}) with gr.Group(elem_classes="download-section"): html_download = gr.File(label="📥 Descargar visor HTML (avión + transiciones)", type="filepath", interactive=False) publish_btn = gr.Button("📤 Publicar en GitHub (Gist)", variant="primary") publish_status = gr.Markdown("") js_output = gr.HTML(visible=False) process_btn.click(fn=process_and_display, inputs=[audio_input, gen_music, chunk_duration], outputs=[loading_indicator] + group_components + [comp for cont in output_containers for comp in [cont["emotion"], cont["transcription"], cont["sentiment"], cont["image"], cont["image_360"], cont["music"], cont["music_file"]]] + [html_download, publish_status, js_output]) clear_btn.click(fn=clear_all, inputs=[], outputs=[audio_input] + group_components + [comp for cont in output_containers for comp in [cont["emotion"], cont["transcription"], cont["sentiment"], cont["image"], cont["image_360"], cont["music"], cont["music_file"]]] + [loading_indicator, chunk_duration, html_download, publish_status, js_output]) publish_btn.click(fn=publish_to_github, inputs=[html_download], outputs=[publish_status]) app = mount_gradio_app(fastapi_app, interface, path="/") if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=7860)