Spaces:
Sleeping
Sleeping
| """AI Radio - Personalized Radio Station with AI Host""" | |
| import gradio as gr | |
| import os | |
| import json | |
| import time | |
| import re | |
| from typing import Dict, Any, List, Optional | |
| import threading | |
| from pydub import AudioSegment | |
| # Get project root directory (parent of src/) | |
| PROJECT_ROOT = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) | |
| AUDIO_DIR = os.path.join(PROJECT_ROOT, "audio") | |
| LOG_DIR = os.path.join(PROJECT_ROOT, "logs") | |
| os.makedirs(AUDIO_DIR, exist_ok=True) | |
| os.makedirs(LOG_DIR, exist_ok=True) | |
| from config import get_config | |
| from radio_agent import RadioAgent | |
| from tts_service import TTSService | |
| from rag_system import RadioRAGSystem | |
| from voice_input import VoiceInputService | |
| from user_memory import UserMemoryService | |
| def create_youtube_player_html(youtube_id: str, title: str, artist: str, url: str) -> str: | |
| """Create YouTube player iframe HTML""" | |
| return f""" | |
| <iframe width="100%" height="500" | |
| src="https://www.youtube.com/embed/{youtube_id}?autoplay=1" | |
| allow="accelerometer; autoplay; encrypted-media; gyroscope; picture-in-picture" | |
| allowfullscreen> | |
| </iframe> | |
| """ | |
| def create_podcast_player_html(youtube_id: str, title: str, host: str, url: str) -> str: | |
| """Create podcast YouTube player iframe HTML""" | |
| return f""" | |
| <iframe width="100%" height="500" | |
| src="https://www.youtube.com/embed/{youtube_id}?autoplay=1" | |
| allow="accelerometer; autoplay; encrypted-media; gyroscope; picture-in-picture" | |
| allowfullscreen> | |
| </iframe> | |
| """ | |
| def create_placeholder_html(title: str, artist: str, delay_s: float, media_type: str = "music") -> str: | |
| """Create placeholder HTML shown while waiting for host speech to end""" | |
| bg_color = "#1a1a2e" if media_type == "music" else "#6b46c1" | |
| icon = "π΅" if media_type == "music" else "ποΈ" | |
| return f""" | |
| <div style="padding: 1rem; background: linear-gradient(135deg, {bg_color} 0%, #16213e 100%); border-radius: 12px; margin: 1rem 0; box-shadow: 0 4px 15px rgba(0,0,0,0.3);"> | |
| <h4 style="margin: 0 0 0.75rem 0; color: #fff; font-size: 1.1em;">{icon} {title}</h4> | |
| <p style="margin: 0 0 0.75rem 0; color: #aaa; font-size: 0.9em;">by {artist}</p> | |
| <div style="position: relative; width: 100%; padding-bottom: 56.25%; border-radius: 8px; overflow: hidden; background: #000; display: flex; align-items: center; justify-content: center;"> | |
| <div style="position: absolute; top: 50%; left: 50%; transform: translate(-50%, -50%); text-align: center; color: #fff;"> | |
| <div style="font-size: 3em; margin-bottom: 0.5rem;">β³</div> | |
| <p style="margin: 0; font-size: 0.9em;">Video starts in {delay_s:.0f}s...</p> | |
| <p style="margin: 0.5rem 0 0 0; font-size: 0.8em; color: #888;">After host finishes speaking</p> | |
| </div> | |
| </div> | |
| </div> | |
| """ | |
| def get_pending_player(): | |
| """Check if there's a pending player ready to show""" | |
| pending = radio_state.get("pending_player") | |
| if pending and time.time() >= pending.get("ready_at", 0): | |
| html = pending.get("html", "") | |
| radio_state["pending_player"] = None # Clear it | |
| return html | |
| return None | |
| # Global state | |
| radio_state = { | |
| "is_playing": False, | |
| "is_stopped": False, # User explicitly stopped (vs just paused) | |
| "current_segment_index": 0, | |
| "planned_show": [], | |
| "user_preferences": {}, | |
| "stop_flag": False, | |
| "current_news_batches": [], | |
| "news_total_batches": 0, | |
| "news_batches_played": 0, | |
| "last_segment": None, | |
| "current_track": None, # Currently playing track for like/dislike | |
| "user_id": None, # Current user ID | |
| "auto_play": True, # Auto-play next segment when current finishes | |
| "content_filter": { | |
| "music": True, | |
| "news": True, | |
| "podcasts": True, | |
| "stories": True | |
| }, | |
| # Pending media player (to be added after host speech ends) | |
| "pending_player": None, # {"html": "...", "ready_at": timestamp} | |
| } | |
| # Initialize services | |
| config = get_config() | |
| agent = RadioAgent(config) | |
| # Use open-source local TTS instead of ElevenLabs | |
| tts_service = TTSService(api_key=getattr(config, "elevenlabs_api_key", ""), voice_id=getattr(config, "elevenlabs_voice_id", "")) | |
| voice_input_service = VoiceInputService() | |
| user_memory = UserMemoryService() | |
| def save_preferences(name: str, favorite_genres: List[str], interests: List[str], | |
| podcast_interests: List[str], mood: str, | |
| music_filter: bool, news_filter: bool, podcast_filter: bool, story_filter: bool, | |
| voice_name: str = None) -> str: | |
| """Save user preferences to RAG system""" | |
| # Initialize user if not already done | |
| if not radio_state.get("user_id"): | |
| user_id, _, _ = user_memory.get_or_create_user() | |
| radio_state["user_id"] = user_id | |
| print(f"π€ User initialized: {radio_state['user_id']}") | |
| # Map voice name to voice_id if needed | |
| from tts_service import VOICE_OPTIONS | |
| selected_voice_id = "21m00Tcm4TlvDq8ikWAM" # Default Rachel | |
| if voice_name and voice_name in VOICE_OPTIONS: | |
| selected_voice_id = VOICE_OPTIONS[voice_name] | |
| preferences = { | |
| "name": name or "Friend", | |
| "favorite_genres": favorite_genres or ["pop"], | |
| "interests": interests or ["world"], | |
| "podcast_interests": podcast_interests or ["technology"], | |
| "mood": mood or "happy", | |
| "voice_id": selected_voice_id, | |
| "voice_name": voice_name or "Rachel (Female)", | |
| "content_filter": { | |
| "music": music_filter, | |
| "news": news_filter, | |
| "podcasts": podcast_filter, | |
| "stories": story_filter | |
| }, | |
| "timestamp": time.time() | |
| } | |
| # Update content filter | |
| radio_state["content_filter"] = preferences["content_filter"] | |
| radio_state["user_preferences"] = preferences | |
| # Update TTS service with new voice | |
| global tts_service | |
| tts_service.voice_id = selected_voice_id | |
| print(f"π€ Voice updated to: {voice_name} (ID: {selected_voice_id})") | |
| # Save to RAG system with user_id | |
| user_id = radio_state.get("user_id") | |
| agent.rag_system.store_user_preferences(preferences, user_id=user_id) | |
| # Save to user memory for persistence | |
| if user_id: | |
| user_memory.save_user_preferences(user_id, preferences) | |
| print(f"πΎ Preferences saved for user {user_id}") | |
| return f"β Preferences saved! Welcome, {preferences['name']}! Your personalized radio is ready." | |
| def get_saved_preferences() -> tuple: | |
| """Get saved preferences for the current user to autofill the form | |
| Returns: | |
| Tuple of (name, genres, interests, podcast_interests, mood, music, news, podcast, story, voice_name) | |
| """ | |
| user_id = radio_state.get("user_id") | |
| if not user_id: | |
| # Return defaults | |
| return "Friend", ["pop", "rock"], ["technology", "world"], ["technology"], "happy", True, True, True, True, "Rachel (Female)" | |
| prefs = user_memory.get_user_preferences(user_id) | |
| if not prefs: | |
| return "Friend", ["pop", "rock"], ["technology", "world"], ["technology"], "happy", True, True, True, True, "Rachel (Female)" | |
| # Extract content filter | |
| content_filter = prefs.get("content_filter", {}) | |
| # Get voice name (prefer stored voice_name, otherwise map from voice_id) | |
| voice_name = prefs.get("voice_name", "Rachel (Female)") | |
| if not voice_name or voice_name not in ["Rachel (Female)", "Lera (Female)", "Bella (Female)", "Antoni (Male)", "Arnold (Male)", "Adam (Male)", "Domi (Female)", "Elli (Female)", "Josh (Male)", "Sam (Male)"]: | |
| # Try to map voice_id back to name | |
| from tts_service import VOICE_OPTIONS | |
| voice_id = prefs.get("voice_id", "21m00Tcm4TlvDq8ikWAM") | |
| for name, vid in VOICE_OPTIONS.items(): | |
| if vid == voice_id: | |
| voice_name = name | |
| break | |
| else: | |
| voice_name = "Rachel (Female)" # Default fallback | |
| return ( | |
| prefs.get("name", "Friend"), | |
| prefs.get("favorite_genres", ["pop", "rock"]), | |
| prefs.get("interests", ["technology", "world"]), | |
| prefs.get("podcast_interests", ["technology"]), | |
| prefs.get("mood", "happy"), | |
| content_filter.get("music", True), | |
| content_filter.get("news", True), | |
| content_filter.get("podcasts", True), | |
| content_filter.get("stories", True), | |
| voice_name | |
| ) | |
| def start_radio_stream(): | |
| """Start the radio stream""" | |
| # Use saved preferences, or set defaults if none exist | |
| if not radio_state["user_preferences"]: | |
| # Try to load from user memory | |
| user_id = radio_state.get("user_id") | |
| if user_id: | |
| saved = user_memory.get_user_preferences(user_id) | |
| if saved: | |
| radio_state["user_preferences"] = saved | |
| radio_state["content_filter"] = saved.get("content_filter", { | |
| "music": True, "news": True, "podcasts": True, "stories": True | |
| }) | |
| # Still no preferences? Use defaults | |
| if not radio_state["user_preferences"]: | |
| radio_state["user_preferences"] = { | |
| "name": "Friend", | |
| "favorite_genres": ["pop", "rock"], | |
| "interests": ["technology", "world"], | |
| "podcast_interests": ["technology"], | |
| "mood": "happy", | |
| "voice_id": "21m00Tcm4TlvDq8ikWAM", | |
| "voice_name": "Rachel (Female)" | |
| } | |
| radio_state["content_filter"] = { | |
| "music": True, "news": True, "podcasts": True, "stories": True | |
| } | |
| if radio_state["is_playing"]: | |
| return "π» Radio is already playing!", None, None, None, "" | |
| # Plan the show with content filter | |
| user_id = radio_state.get("user_id") | |
| show_plan = agent.plan_radio_show( | |
| user_id=user_id, | |
| user_preferences=radio_state["user_preferences"], | |
| duration_minutes=30, | |
| content_filter=radio_state["content_filter"] | |
| ) | |
| radio_state["planned_show"] = show_plan | |
| radio_state["current_segment_index"] = 0 | |
| radio_state["is_playing"] = True | |
| radio_state["stop_flag"] = False | |
| radio_state["current_news_batches"] = [] | |
| radio_state["news_total_batches"] = 0 | |
| radio_state["news_batches_played"] = 0 | |
| radio_state["last_segment"] = None | |
| return "π΅ Starting your personalized radio show...", None, None, None, "" | |
| def show_pending_player(): | |
| """Called by timer to finally show the media player""" | |
| player_html = radio_state.get("pending_player_html", "") | |
| print("β° Timer fired: Showing pending media player now") | |
| # Return player HTML and disable timer | |
| return player_html, gr.Timer(value=0, active=False) | |
| def start_and_play_first_segment(): | |
| """ | |
| One-shot helper for the UI: | |
| - If resuming from stop: continue from next segment | |
| - If fresh start: Plan the show and play first segment | |
| Returns everything needed to update the UI in a single call. | |
| """ | |
| print("βΆοΈ [start_and_play_first_segment] Starting...") | |
| # Initialize user if not already done | |
| if not radio_state.get("user_id"): | |
| user_id, _, _ = user_memory.get_or_create_user() | |
| radio_state["user_id"] = user_id | |
| print(f"π€ User initialized: {radio_state['user_id']}") | |
| # Use saved preferences, or set defaults if none exist | |
| if not radio_state["user_preferences"]: | |
| # Try to load from user memory | |
| user_id = radio_state.get("user_id") | |
| if user_id: | |
| saved = user_memory.get_user_preferences(user_id) | |
| if saved: | |
| radio_state["user_preferences"] = saved | |
| radio_state["content_filter"] = saved.get("content_filter", { | |
| "music": True, "news": True, "podcasts": True, "stories": True | |
| }) | |
| # Update TTS service with saved voice | |
| voice_id = saved.get("voice_id", "21m00Tcm4TlvDq8ikWAM") | |
| tts_service.voice_id = voice_id | |
| print(f"π Loaded saved preferences for user {user_id}") | |
| print(f"π€ Voice set to: {saved.get('voice_name', 'Rachel (Female)')} (ID: {voice_id})") | |
| # Still no preferences? Use defaults | |
| if not radio_state["user_preferences"]: | |
| radio_state["user_preferences"] = { | |
| "name": "Friend", | |
| "favorite_genres": ["pop", "rock"], | |
| "interests": ["technology", "world"], | |
| "podcast_interests": ["technology"], | |
| "mood": "happy", | |
| "voice_id": "21m00Tcm4TlvDq8ikWAM", | |
| "voice_name": "Rachel (Female)" | |
| } | |
| radio_state["content_filter"] = { | |
| "music": True, "news": True, "podcasts": True, "stories": True | |
| } | |
| # Update TTS service with default voice | |
| tts_service.voice_id = "21m00Tcm4TlvDq8ikWAM" | |
| print("π Using default preferences") | |
| # Check if resuming from stopped state (only if show was manually stopped, not completed) | |
| if radio_state.get("is_stopped") and radio_state.get("planned_show") and radio_state["current_segment_index"] < len(radio_state["planned_show"]): | |
| print("βΆοΈ Resuming from stopped state...") | |
| radio_state["is_playing"] = True | |
| radio_state["is_stopped"] = False | |
| radio_state["stop_flag"] = False | |
| # Play next segment (continuing from where we stopped) | |
| segment_info, host_audio, music_html, progress, now_playing, llm_script, timer_config = play_next_segment() | |
| return ( | |
| "βΆοΈ Resuming radio...", | |
| segment_info, | |
| host_audio, | |
| music_html, | |
| progress, | |
| now_playing, | |
| llm_script, | |
| timer_config | |
| ) | |
| # If show completed or no valid show to resume, start fresh | |
| # Also check if is_playing is True but show is actually completed | |
| if radio_state.get("is_playing") and radio_state.get("planned_show"): | |
| # Check if show is actually completed | |
| if radio_state["current_segment_index"] >= len(radio_state["planned_show"]): | |
| # Show completed, reset and start fresh | |
| print("π Show completed, resetting for new show...") | |
| reset_radio() | |
| else: | |
| # Show is actually playing | |
| return ( | |
| "π» Radio is already playing!", | |
| "**Welcome!** Set your preferences and start the radio.", | |
| None, None, "Ready to start", "π» AI Radio", | |
| "Model-generated script will appear here for each segment.", | |
| gr.Timer(value=0, active=False) | |
| ) | |
| elif radio_state.get("is_playing"): | |
| # is_playing is True but no planned show - reset | |
| print("π is_playing=True but no planned_show, resetting...") | |
| reset_radio() | |
| import time | |
| t0 = time.time() | |
| # Fresh start - reset and plan new show | |
| reset_radio() | |
| # Step 1: Plan skeleton show (YouTube searches happen here) | |
| print(" [1/3] Planning show (searching YouTube)...") | |
| user_id = radio_state.get("user_id") | |
| show_plan = agent.plan_radio_show( | |
| user_id=user_id, | |
| user_preferences=radio_state["user_preferences"], | |
| duration_minutes=30, | |
| content_filter=radio_state["content_filter"] | |
| ) | |
| print(f" [1/3] Done in {time.time()-t0:.1f}s, segments: {len(show_plan)}") | |
| radio_state["planned_show"] = show_plan | |
| radio_state["current_segment_index"] = 0 | |
| radio_state["is_playing"] = True | |
| radio_state["stop_flag"] = False | |
| radio_state["is_stopped"] = False | |
| # Step 2 & 3: Generate and play first segment (LLM + TTS inside play_next_segment) | |
| print(" [2/3] Generating first segment (LLM)...") | |
| t1 = time.time() | |
| segment_info, host_audio, music_html, progress, now_playing, llm_script, timer_config = play_next_segment() | |
| print(f" [2/3] Done in {time.time()-t1:.1f}s") | |
| print(f" [3/3] Total time: {time.time()-t0:.1f}s") | |
| print(f" Host audio file: {host_audio}") | |
| return ( | |
| "π΅ Starting your personalized radio show...", | |
| segment_info, | |
| host_audio, | |
| music_html, | |
| progress, | |
| now_playing, | |
| llm_script, | |
| timer_config | |
| ) | |
| def play_next_segment(): | |
| """Play the next segment in the show - returns host audio and music audio separately""" | |
| # If stopped, resume first | |
| if radio_state.get("is_stopped"): | |
| print("βΆοΈ Resuming from stopped state in play_next_segment...") | |
| radio_state["is_stopped"] = False | |
| radio_state["stop_flag"] = False | |
| radio_state["is_playing"] = True | |
| if not radio_state["is_playing"]: | |
| # If we have a planned show, try to resume | |
| if radio_state.get("planned_show"): | |
| radio_state["is_playing"] = True | |
| radio_state["stop_flag"] = False | |
| else: | |
| return "βΈοΈ No show planned. Click 'Generate & Play' to start.", None, None, None, "", "", gr.Timer(value=0, active=False) | |
| if radio_state["stop_flag"]: | |
| radio_state["is_playing"] = False | |
| return "βΈοΈ Radio paused", None, None, None, "", "", gr.Timer(value=0, active=False) | |
| # If there are remaining news TTS batches, play the next one without advancing to a new segment | |
| if radio_state.get("current_news_batches"): | |
| segment = radio_state.get("last_segment") | |
| if segment: | |
| batch_index = radio_state.get("news_batches_played", 1) + 1 | |
| next_batch = radio_state["current_news_batches"].pop(0) | |
| radio_state["news_batches_played"] = batch_index | |
| host_audio_file = None | |
| if getattr(tts_service, "available", True): | |
| audio_bytes = tts_service.text_to_speech(next_batch) | |
| if audio_bytes: | |
| host_audio_file = os.path.join(AUDIO_DIR, f"segment_{radio_state['current_segment_index']}_batch_{batch_index}.mp3") | |
| tts_service.save_audio(audio_bytes, host_audio_file) | |
| # Build display info (same news segment) | |
| segment_info = format_segment_info(segment) | |
| raw_script = get_segment_text(segment) | |
| script_batches = segment.get("script_batches", segment.get("script", [])) | |
| if isinstance(script_batches, list) and script_batches: | |
| display_script = "\n\n---\n\n".join(str(b) for b in script_batches) | |
| else: | |
| display_script = str(raw_script) if raw_script else "" | |
| total = radio_state.get("news_total_batches", 1) or 1 | |
| progress = f"News batch {batch_index}/{total}" | |
| # When no more batches, clear counters so next click advances to next segment | |
| if not radio_state["current_news_batches"]: | |
| radio_state["news_total_batches"] = 0 | |
| radio_state["news_batches_played"] = 0 | |
| return segment_info, host_audio_file, "", progress, get_now_playing(segment), display_script, gr.Timer(value=0, active=False) | |
| if radio_state["current_segment_index"] >= len(radio_state["planned_show"]): | |
| radio_state["is_playing"] = False | |
| return "π Show completed! Hope you enjoyed it.", None, None, None, "", "", gr.Timer(value=0, active=False) | |
| # Get current segment | |
| segment_index = radio_state["current_segment_index"] | |
| segment = radio_state["planned_show"][segment_index] | |
| radio_state["current_segment_index"] += 1 | |
| # Lazily generate LLM content for this segment | |
| user_prefs = radio_state["user_preferences"].copy() | |
| user_id = radio_state.get("user_id") | |
| if user_id: | |
| user_prefs["_user_id"] = user_id # Pass user_id in preferences for RAG queries | |
| segment = agent.enrich_segment(segment, user_prefs) | |
| radio_state["planned_show"][segment_index] = segment | |
| # Execute segment (log to RAG) | |
| agent.execute_segment(segment, user_id=user_id) | |
| radio_state["last_segment"] = segment | |
| # Generate content display and underlying script text | |
| segment_info = format_segment_info(segment) | |
| raw_script = get_segment_text(segment) | |
| # Generate TTS for host commentary | |
| host_audio_file = None | |
| music_player_html = "" | |
| if segment["type"] in ["intro", "outro", "story"]: | |
| text = raw_script | |
| if text and getattr(tts_service, "available", True): | |
| audio_bytes = tts_service.text_to_speech(text) | |
| if audio_bytes: | |
| host_audio_file = os.path.join(AUDIO_DIR, f"segment_{radio_state['current_segment_index']}.mp3") | |
| tts_service.save_audio(audio_bytes, host_audio_file) | |
| elif segment["type"] == "news": | |
| # Handle batched news generation | |
| script_batches = segment.get("script_batches", segment.get("script", [])) | |
| if isinstance(script_batches, list) and script_batches: | |
| # Generate first batch immediately (user doesn't wait) | |
| first_batch = script_batches[0] | |
| if getattr(tts_service, "available", True): | |
| audio_bytes = tts_service.text_to_speech(first_batch) | |
| if audio_bytes: | |
| host_audio_file = os.path.join(AUDIO_DIR, f"segment_{radio_state['current_segment_index']}_batch_1.mp3") | |
| tts_service.save_audio(audio_bytes, host_audio_file) | |
| # Store remaining batches for sequential playback | |
| segment["remaining_batches"] = script_batches[1:] if len(script_batches) > 1 else [] | |
| else: | |
| # Fallback for old format | |
| text = raw_script | |
| if text and getattr(tts_service, "available", True): | |
| audio_bytes = tts_service.text_to_speech(text) | |
| if audio_bytes: | |
| host_audio_file = f"segment_{radio_state['current_segment_index']}.mp3" | |
| tts_service.save_audio(audio_bytes, host_audio_file) | |
| elif segment["type"] == "music": | |
| # For music: generate host commentary, then get music | |
| commentary = segment.get("commentary", "") | |
| # Generate host commentary | |
| if commentary and getattr(tts_service, "available", True): | |
| audio_bytes = tts_service.text_to_speech(commentary) | |
| if audio_bytes: | |
| host_audio_file = os.path.join(AUDIO_DIR, f"segment_{radio_state['current_segment_index']}_host.mp3") | |
| tts_service.save_audio(audio_bytes, host_audio_file) | |
| # Get music track - check if we should play a liked track | |
| track = segment.get("track", {}) | |
| user_id = radio_state.get("user_id") | |
| # 30% chance to play a liked track if user has any | |
| if user_id and user_memory.should_play_liked_track(user_id, probability=0.3): | |
| genre = track.get("genre", "") if track else "" | |
| liked_track = user_memory.get_random_liked_track(user_id, genre) | |
| if liked_track: | |
| print(f"β€οΈ Playing liked track: {liked_track.get('title', 'Unknown')}") | |
| track = liked_track | |
| segment["track"] = liked_track | |
| segment["from_liked"] = True | |
| if track: | |
| # Store current track for like/dislike buttons | |
| radio_state["current_track"] = track | |
| # Add to user history | |
| if user_id: | |
| user_memory.add_to_history(user_id, track) | |
| print(f"π΅ Preparing music: {track.get('title', 'Unknown')} by {track.get('artist', 'Unknown')}") | |
| # Get streaming URL for YouTube or SoundCloud | |
| # Modal proxy returns "youtube_api" as source, so check both | |
| if track.get("source") in ["youtube", "youtube_api"] and track.get("url"): | |
| try: | |
| # Extract YouTube ID | |
| youtube_id = track.get("youtube_id", "") | |
| if not youtube_id and "v=" in track["url"]: | |
| youtube_id = track["url"].split("v=")[-1].split("&")[0] | |
| if youtube_id: | |
| # # COMMENTED OUT: Delayed loading mechanism | |
| # # Get host audio duration to calculate delay | |
| # delay_ms = 0 | |
| # delay_s = 0.0 | |
| # if host_audio_file and os.path.exists(host_audio_file): | |
| # try: | |
| # audio = AudioSegment.from_file(host_audio_file) | |
| # duration_ms = len(audio) | |
| # delay_ms = max(0, duration_ms - 3000) # Start 3 seconds before end | |
| # delay_s = delay_ms / 1000.0 | |
| # print(f"β±οΈ Host audio: {duration_ms}ms, YouTube delay: {delay_ms}ms") | |
| # except Exception as e: | |
| # print(f"β οΈ Could not get audio duration: {e}") | |
| # # Store the actual iframe HTML for later (after host speech ends) | |
| # actual_player_html = create_youtube_player_html( | |
| # youtube_id, | |
| # track.get('title', 'Unknown'), | |
| # track.get('artist', 'Unknown'), | |
| # track['url'] | |
| # ) | |
| # # Schedule the player to be ready after the delay | |
| # radio_state["pending_player"] = { | |
| # "html": actual_player_html, | |
| # "ready_at": time.time() + delay_s, | |
| # "type": "music", | |
| # "title": track.get('title', 'Unknown') | |
| # } | |
| # # Return placeholder initially (no iframe yet!) | |
| # music_player_html = create_placeholder_html( | |
| # track.get('title', 'Unknown'), | |
| # track.get('artist', 'Unknown'), | |
| # delay_s, | |
| # "music" | |
| # ) | |
| # print(f"β YouTube player scheduled: {track.get('title', 'Unknown')} (ID: {youtube_id}) - iframe added after {delay_s:.1f}s") | |
| # Simple immediate iframe - start track with host | |
| music_player_html = create_youtube_player_html( | |
| youtube_id, | |
| track.get('title', 'Unknown'), | |
| track.get('artist', 'Unknown'), | |
| track['url'] | |
| ) | |
| print(f"β YouTube player created immediately: {track.get('title', 'Unknown')} (ID: {youtube_id})") | |
| else: | |
| # Fallback: Just show link | |
| music_player_html = f""" | |
| <div style="padding: 1rem; background: #f0f0f0; border-radius: 10px; margin: 1rem 0;"> | |
| <h4>π΅ {track.get('title', 'Unknown')} - {track.get('artist', 'Unknown')}</h4> | |
| <p>Click to listen:</p> | |
| <a href="{track['url']}" target="_blank" style="display: inline-block; padding: 0.5rem 1rem; background: #ff0000; color: white; text-decoration: none; border-radius: 5px;"> | |
| βΆοΈ Play on YouTube | |
| </a> | |
| </div> | |
| """ | |
| except Exception as e: | |
| print(f"β Error getting music: {e}") | |
| # Fallback: Just show YouTube link | |
| music_player_html = f""" | |
| <div style="padding: 1rem; background: #f0f0f0; border-radius: 10px; margin: 1rem 0;"> | |
| <h4>π΅ {track.get('title', 'Unknown')} - {track.get('artist', 'Unknown')}</h4> | |
| <p>Click to listen:</p> | |
| <a href="{track['url']}" target="_blank" style="display: inline-block; padding: 0.5rem 1rem; background: #ff0000; color: white; text-decoration: none; border-radius: 5px;"> | |
| βΆοΈ Play on YouTube | |
| </a> | |
| </div> | |
| """ | |
| elif track.get("source") == "soundcloud" and track.get("url"): | |
| # Simple SoundCloud embed player | |
| try: | |
| music_player_html = f""" | |
| <div style="padding: 1rem; background: linear-gradient(135deg, #ff5500 0%, #ff8800 100%); border-radius: 12px; margin: 1rem 0; box-shadow: 0 4px 15px rgba(0,0,0,0.3);"> | |
| <h4 style="margin: 0 0 0.75rem 0; color: #fff; font-size: 1.1em;">π΅ {track.get('title', 'Unknown')}</h4> | |
| <p style="margin: 0 0 0.75rem 0; color: #ffe0cc; font-size: 0.9em;">by {track.get('artist', 'Unknown')}</p> | |
| <iframe width="100%" height="166" scrolling="no" frameborder="no" | |
| style="border-radius: 8px;" | |
| src="https://w.soundcloud.com/player/?url={track['url']}&color=%23ffffff&auto_play=true&hide_related=true&show_comments=false&show_user=true&show_reposts=false&show_teaser=false"></iframe> | |
| <p style="margin-top: 0.75rem; font-size: 0.85em; margin-bottom: 0;"> | |
| <a href="{track['url']}" target="_blank" style="color: #fff; text-decoration: none;">π Open on SoundCloud</a> | |
| </p> | |
| </div> | |
| """ | |
| print(f"β Streaming from SoundCloud: {track.get('title', 'Unknown')}") | |
| except Exception as e: | |
| print(f"β Error with SoundCloud: {e}") | |
| music_player_html = f""" | |
| <div style="padding: 1rem; background: #f0f0f0; border-radius: 10px; margin: 1rem 0;"> | |
| <h4>π΅ {track.get('title', 'Unknown')} - {track.get('artist', 'Unknown')}</h4> | |
| <a href="{track['url']}" target="_blank" style="display: inline-block; padding: 0.5rem 1rem; background: #ff5500; color: white; text-decoration: none; border-radius: 5px;"> | |
| βΆοΈ Play on SoundCloud | |
| </a> | |
| </div> | |
| """ | |
| else: | |
| # Demo track - no actual audio file | |
| print("βΉοΈ Demo track (no audio file available)") | |
| music_player_html = "" | |
| elif segment["type"] == "podcast": | |
| # For podcast: generate intro and create YouTube player | |
| intro = segment.get("intro", "") | |
| podcast = segment.get("podcast", {}) | |
| # Initialize music_player_html for podcast | |
| music_player_html = "" | |
| if intro and getattr(tts_service, "available", True): | |
| audio_bytes = tts_service.text_to_speech(intro) | |
| if audio_bytes: | |
| host_audio_file = os.path.join(AUDIO_DIR, f"segment_{radio_state['current_segment_index']}_host.mp3") | |
| tts_service.save_audio(audio_bytes, host_audio_file) | |
| # Create YouTube player for podcast if available | |
| # Modal proxy returns "youtube_api" as source, so check both | |
| print(f"π Podcast debug: source={podcast.get('source')}, youtube_id={podcast.get('youtube_id')}, url={podcast.get('url', '')[:50]}") | |
| if podcast.get("source") in ["youtube", "youtube_api"] and podcast.get("youtube_id"): | |
| youtube_id = podcast.get("youtube_id", "") | |
| print(f"β Creating podcast iframe with youtube_id: {youtube_id}") | |
| # # COMMENTED OUT: Delayed loading mechanism | |
| # # Get host audio duration to calculate delay | |
| # delay_ms = 0 | |
| # if host_audio_file and os.path.exists(host_audio_file): | |
| # try: | |
| # audio = AudioSegment.from_file(host_audio_file) | |
| # duration_ms = len(audio) | |
| # delay_ms = max(0, duration_ms - 3000) # Start 3 seconds before end | |
| # print(f"β±οΈ Podcast host audio: {duration_ms}ms, YouTube delay: {delay_ms}ms") | |
| # except Exception as e: | |
| # print(f"β οΈ Could not get audio duration: {e}") | |
| # # Store the actual iframe HTML for later (after host speech ends) | |
| # delay_s = delay_ms / 1000.0 | |
| # actual_player_html = create_podcast_player_html( | |
| # youtube_id, | |
| # podcast.get('title', 'Podcast'), | |
| # podcast.get('host', 'Unknown Host'), | |
| # podcast.get('url', '#') | |
| # ) | |
| # # Schedule the player to be ready after the delay | |
| # radio_state["pending_player"] = { | |
| # "html": actual_player_html, | |
| # "ready_at": time.time() + delay_s, | |
| # "type": "podcast", | |
| # "title": podcast.get('title', 'Unknown') | |
| # } | |
| # # Return placeholder initially (no iframe yet!) | |
| # music_player_html = create_placeholder_html( | |
| # podcast.get('title', 'Podcast'), | |
| # podcast.get('host', 'Unknown Host'), | |
| # delay_s, | |
| # "podcast" | |
| # ) | |
| # print(f"β Podcast YouTube player scheduled: {podcast.get('title', 'Unknown')} (ID: {youtube_id}) - iframe added after {delay_s:.1f}s") | |
| # Simple immediate iframe - start podcast with host | |
| music_player_html = create_podcast_player_html( | |
| youtube_id, | |
| podcast.get('title', 'Podcast'), | |
| podcast.get('host', 'Unknown Host'), | |
| podcast.get('url', '#') | |
| ) | |
| print(f"β Podcast YouTube player created immediately: {podcast.get('title', 'Unknown')} (ID: {youtube_id})") | |
| print(f"π¦ Podcast iframe HTML length: {len(music_player_html)} chars, contains iframe: {'<iframe' in music_player_html}") | |
| elif podcast.get("url") and "youtube" in podcast.get("url", ""): | |
| print(f"β οΈ Podcast: using fallback URL extraction") | |
| # Fallback: extract YouTube ID from URL and create simple embed | |
| url = podcast.get("url", "") | |
| youtube_id = "" | |
| if "v=" in url: | |
| youtube_id = url.split("v=")[-1].split("&")[0] | |
| elif "youtu.be/" in url: | |
| youtube_id = url.split("youtu.be/")[-1].split("?")[0] | |
| if youtube_id: | |
| # # COMMENTED OUT: Delayed loading mechanism | |
| # # Get host audio duration to calculate delay | |
| # delay_ms = 0 | |
| # if host_audio_file and os.path.exists(host_audio_file): | |
| # try: | |
| # audio = AudioSegment.from_file(host_audio_file) | |
| # duration_ms = len(audio) | |
| # delay_ms = max(0, duration_ms - 3000) | |
| # except: | |
| # pass | |
| # delay_s = delay_ms / 1000.0 | |
| # actual_player_html = create_podcast_player_html( | |
| # youtube_id, | |
| # podcast.get('title', 'Podcast'), | |
| # podcast.get('host', 'Unknown Host'), | |
| # url | |
| # ) | |
| # # Schedule the player to be ready after the delay | |
| # radio_state["pending_player"] = { | |
| # "html": actual_player_html, | |
| # "ready_at": time.time() + delay_s, | |
| # "type": "podcast", | |
| # "title": podcast.get('title', 'Unknown') | |
| # } | |
| # # Return placeholder initially | |
| # music_player_html = create_placeholder_html( | |
| # podcast.get('title', 'Podcast'), | |
| # podcast.get('host', 'Unknown Host'), | |
| # delay_s, | |
| # "podcast" | |
| # ) | |
| # print(f"β Podcast iframe player scheduled: {podcast.get('title', 'Unknown')} (ID: {youtube_id}) - iframe added after {delay_s:.1f}s") | |
| # Simple immediate iframe | |
| music_player_html = create_podcast_player_html( | |
| youtube_id, | |
| podcast.get('title', 'Podcast'), | |
| podcast.get('host', 'Unknown Host'), | |
| url | |
| ) | |
| print(f"β Podcast iframe player created immediately: {podcast.get('title', 'Unknown')} (ID: {youtube_id})") | |
| # Progress info | |
| progress = f"Segment {radio_state['current_segment_index']}/{len(radio_state['planned_show'])}" | |
| # Prepare LLM script for UI: split into smaller segments for readability | |
| display_script = "" | |
| if segment["type"] == "news": | |
| # Use news batches if available | |
| script_batches = segment.get("script_batches", segment.get("script", [])) | |
| if isinstance(script_batches, list) and script_batches: | |
| display_script = "\n\n---\n\n".join(str(b) for b in script_batches) | |
| else: | |
| display_script = str(raw_script) if raw_script else "" | |
| else: | |
| if raw_script: | |
| segments = split_text_into_segments(raw_script) | |
| segment["text_segments"] = segments | |
| display_script = "\n\n---\n\n".join(segments) | |
| else: | |
| display_script = "" | |
| # Optionally prefetch next segment's LLM content in the background | |
| def _prefetch_next(): | |
| try: | |
| idx = radio_state["current_segment_index"] | |
| if idx < len(radio_state["planned_show"]): | |
| next_seg = radio_state["planned_show"][idx] | |
| enriched = agent.enrich_segment(next_seg, radio_state["user_preferences"]) | |
| radio_state["planned_show"][idx] = enriched | |
| except Exception: | |
| pass | |
| threading.Thread(target=_prefetch_next, daemon=True).start() | |
| # Calculate segment duration and set up delayed player if needed | |
| timer_config = gr.Timer(value=0, active=False) | |
| # Only delay player if we have both host audio and a music/podcast player | |
| # Check if music_player_html contains iframe (for YouTube/SoundCloud players) | |
| has_iframe = music_player_html and ("<iframe" in music_player_html) | |
| if host_audio_file and os.path.exists(host_audio_file) and music_player_html and has_iframe: | |
| try: | |
| audio = AudioSegment.from_file(host_audio_file) | |
| host_duration_s = len(audio) / 1000.0 | |
| # Start player 3 seconds before host speech ends | |
| delay_s = max(0.5, host_duration_s - 3.0) | |
| print(f"β±οΈ Host duration: {host_duration_s:.1f}s, Scheduling player in {delay_s:.1f}s") | |
| # Store the actual player HTML | |
| radio_state["pending_player_html"] = music_player_html | |
| # Return placeholder/empty player and active timer | |
| music_player_html = create_placeholder_html( | |
| segment.get('track', {}).get('title', 'Content') if segment.get('type') == 'music' else segment.get('podcast', {}).get('title', 'Podcast'), | |
| segment.get('track', {}).get('artist', '') if segment.get('type') == 'music' else segment.get('podcast', {}).get('host', ''), | |
| delay_s, | |
| segment.get('type', 'music') | |
| ) | |
| timer_config = gr.Timer(value=delay_s, active=True) | |
| except Exception as e: | |
| print(f"β οΈ Error calculating audio duration for delay: {e}") | |
| # Fallback: play immediately (leave music_player_html as is, timer inactive) | |
| # Return: segment_info, host_audio, music_player_html, progress, now_playing, segmented LLM script for UI, timer | |
| return segment_info, host_audio_file, music_player_html, progress, get_now_playing(segment), display_script, timer_config | |
| def stop_radio(): | |
| """Stop the radio stream - pauses at current segment without resetting""" | |
| radio_state["stop_flag"] = True | |
| radio_state["is_playing"] = False | |
| radio_state["is_stopped"] = True | |
| # DON'T reset planned_show or current_segment_index - allow resuming | |
| current_idx = radio_state.get("current_segment_index", 0) | |
| total_segments = len(radio_state.get("planned_show", [])) | |
| status_msg = f"βΉοΈ Stopped at segment {current_idx}/{total_segments}. Click 'Generate & Play' or 'Next Segment' to continue." | |
| # Return status, clear audio, clear music player, progress, now playing, timer | |
| return ( | |
| status_msg, | |
| None, # Clear audio | |
| "", # Clear music player HTML | |
| f"Stopped at {current_idx}/{total_segments}", | |
| f"βΈοΈ Paused - Segment {current_idx}/{total_segments}", | |
| gr.Timer(value=0, active=False) # Stop timer | |
| ) | |
| def reset_radio(): | |
| """Fully reset the radio - called when starting fresh""" | |
| radio_state["stop_flag"] = False | |
| radio_state["is_playing"] = False | |
| radio_state["is_stopped"] = False | |
| radio_state["planned_show"] = [] | |
| radio_state["current_segment_index"] = 0 | |
| radio_state["current_news_batches"] = [] | |
| radio_state["news_total_batches"] = 0 | |
| radio_state["news_batches_played"] = 0 | |
| def format_segment_info(segment: Dict[str, Any]) -> str: | |
| """Format segment information for display""" | |
| seg_type = segment["type"] | |
| if seg_type == "intro": | |
| return f"ποΈ **Welcome to AI Radio!**\n\n{segment['content']}" | |
| elif seg_type == "outro": | |
| return f"π **Thanks for Listening!**\n\n{segment['content']}" | |
| elif seg_type == "music": | |
| track = segment.get("track", {}) | |
| if track: | |
| music_info = f"""π΅ **Now Playing** | |
| **{track['title']}** | |
| by {track['artist']} | |
| Genre: {track['genre']} | |
| Duration: {track.get('duration', 'Unknown')}s | |
| *{segment.get('commentary', '')}* | |
| """ | |
| # Add YouTube link if available | |
| if segment.get("_youtube_url"): | |
| music_info += f"\n\nπ [Listen on YouTube]({segment['_youtube_url']})" | |
| elif track.get("url") and "youtube" in track.get("url", ""): | |
| music_info += f"\n\nπ [Listen on YouTube]({track['url']})" | |
| return music_info | |
| return "π΅ Music Time!" | |
| elif seg_type == "news": | |
| news_items = segment.get("news_items", []) | |
| news_text = "π° **News Update**\n\n" | |
| # Handle batched script (list) or old format (string) | |
| script = segment.get("script", segment.get("script_batches", "")) | |
| if isinstance(script, list): | |
| # Join all batches for display | |
| news_text += " ".join(script) | |
| else: | |
| # Old format - string | |
| news_text += str(script) if script else "" | |
| news_text += "\n\n**Headlines:**\n" | |
| for item in news_items[:2]: | |
| news_text += f"\nβ’ {item.get('title', 'No title')}" | |
| return news_text | |
| elif seg_type == "podcast": | |
| podcast = segment.get("podcast", {}) | |
| if podcast: | |
| return f"""ποΈ **Podcast Recommendation** | |
| **{podcast['title']}** | |
| Hosted by {podcast['host']} | |
| {podcast['description']} | |
| Duration: {podcast['duration']} | |
| Rating: {'β' * int(podcast.get('rating', 4))} | |
| *{segment.get('intro', '')}* | |
| """ | |
| return "ποΈ Podcast Time!" | |
| elif seg_type == "story": | |
| return f"π **Story Time**\n\n{segment.get('content', '')}" | |
| return "π» Radio Segment" | |
| def get_segment_text(segment: Dict[str, Any]) -> str: | |
| """Extract text for TTS from segment""" | |
| seg_type = segment["type"] | |
| if seg_type in ["intro", "outro", "story"]: | |
| return str(segment.get("content", "")) | |
| elif seg_type == "news": | |
| # Handle batched news | |
| script_batches = segment.get("script_batches", segment.get("script", [])) | |
| if isinstance(script_batches, list): | |
| if script_batches: | |
| return " ".join(str(batch) for batch in script_batches) # Join all batches | |
| return "" | |
| return str(script_batches) if script_batches else "" # Fallback | |
| elif seg_type == "music": | |
| return str(segment.get("commentary", "")) | |
| elif seg_type == "podcast": | |
| return str(segment.get("intro", "")) | |
| return "" | |
| def split_text_into_segments(text: str, max_sentences: int = 2) -> List[str]: | |
| """Split long LLM answers into smaller sentence-based segments for UI display.""" | |
| if not text: | |
| return [] | |
| # Split by sentence endings, keep it simple | |
| sentences = re.split(r'(?<=[.!?])\s+', text.strip()) | |
| segments: List[str] = [] | |
| current: List[str] = [] | |
| for s in sentences: | |
| s = s.strip() | |
| if not s: | |
| continue | |
| current.append(s) | |
| if len(current) >= max_sentences: | |
| segments.append(" ".join(current)) | |
| current = [] | |
| if current: | |
| segments.append(" ".join(current)) | |
| return segments | |
| def handle_voice_request(audio_file): | |
| """Handle voice input request for song from uploaded audio file""" | |
| if not audio_file: | |
| return "β οΈ Please record your voice request first!", None, "", gr.Timer(value=0, active=False) | |
| try: | |
| # Use speech_recognition to process the audio file | |
| import speech_recognition as sr | |
| from pydub import AudioSegment | |
| recognizer = sr.Recognizer() | |
| # Convert audio to WAV format if needed (speech_recognition requires WAV) | |
| audio_path = audio_file | |
| if isinstance(audio_file, tuple): | |
| # Gradio Audio returns (sample_rate, audio_data) or filepath | |
| audio_path = audio_file[1] if len(audio_file) > 1 else audio_file[0] | |
| # If it's not a WAV file, convert it | |
| if audio_path and not audio_path.endswith('.wav'): | |
| try: | |
| # Load and convert to WAV | |
| audio = AudioSegment.from_file(audio_path) | |
| wav_path = audio_path.rsplit('.', 1)[0] + '.wav' | |
| audio.export(wav_path, format="wav") | |
| audio_path = wav_path | |
| except Exception as e: | |
| print(f"β οΈ Could not convert audio: {e}, trying original file") | |
| # Load audio file for recognition | |
| try: | |
| with sr.AudioFile(audio_path) as source: | |
| audio = recognizer.record(source) | |
| except Exception as e: | |
| # Try with pydub conversion first | |
| try: | |
| audio_seg = AudioSegment.from_file(audio_path) | |
| wav_temp = os.path.join(AUDIO_DIR, f"temp_voice_{int(time.time())}.wav") | |
| audio_seg.export(wav_temp, format="wav") | |
| with sr.AudioFile(wav_temp) as source: | |
| audio = recognizer.record(source) | |
| audio_path = wav_temp | |
| except Exception as conv_e: | |
| return f"β Could not process audio file: {conv_e}. Please try recording again.", None, "", gr.Timer(value=0, active=False) | |
| # Recognize speech using Google's API | |
| try: | |
| recognized_text = recognizer.recognize_google(audio) | |
| print(f"π€ Recognized: {recognized_text}") | |
| except sr.UnknownValueError: | |
| return "β Could not understand audio. Please speak clearly and try again.", None, "", gr.Timer(value=0, active=False) | |
| except sr.RequestError as e: | |
| return f"β Error with speech recognition service: {e}. Please try again.", None, "", gr.Timer(value=0, active=False) | |
| # Process the request | |
| song_request = voice_input_service.process_song_request(recognized_text) | |
| print(f"π€ Processed request: {song_request}") | |
| # Search for music | |
| tracks = agent.music_server.search_by_request(song_request) | |
| if not tracks: | |
| return f"β Could not find music for: '{recognized_text}'. Try saying something like 'play pop music' or 'play a song by [artist name]'!", None, "", gr.Timer(value=0, active=False) | |
| # Get the first matching track | |
| track = tracks[0] | |
| print(f"π΅ Selected track: {track.get('title', 'Unknown')} by {track.get('artist', 'Unknown')}") | |
| # Store current track for like/dislike buttons | |
| radio_state["current_track"] = track | |
| # Add to user history | |
| user_id = radio_state.get("user_id") | |
| if user_id: | |
| user_memory.add_to_history(user_id, track) | |
| # Generate host response using LLM | |
| user_prefs = radio_state.get("user_preferences", {}) | |
| host_response = agent.generate_song_request_response(recognized_text, track, user_prefs) | |
| # Generate TTS for host response | |
| audio_file = None | |
| print(f"ποΈ Generating TTS for: {host_response[:50]}...") | |
| if getattr(tts_service, "available", True): | |
| audio_bytes = tts_service.text_to_speech(host_response) | |
| if audio_bytes: | |
| audio_file = os.path.join(AUDIO_DIR, f"voice_request_{int(time.time())}.mp3") | |
| tts_service.save_audio(audio_bytes, audio_file) | |
| print(f"β Audio saved to: {audio_file} ({len(audio_bytes)} bytes)") | |
| else: | |
| print("β TTS returned no audio bytes") | |
| else: | |
| print("β TTS service not available") | |
| # Create music player HTML with volume fading (0β30% during host speech, then 30%β100%) | |
| music_player_html = "" | |
| player_id = f"voice_player_{int(time.time())}" | |
| timer_config = gr.Timer(value=0, active=False) | |
| # Modal proxy returns "youtube_api" as source, so check both | |
| if track.get("source") in ["youtube", "youtube_api"]: | |
| youtube_id = track.get("youtube_id", "") | |
| if not youtube_id and "v=" in track.get("url", ""): | |
| youtube_id = track["url"].split("v=")[-1].split("&")[0] | |
| if youtube_id: | |
| # Delayed loading mechanism with gr.Timer | |
| try: | |
| # Get host audio duration to calculate delay | |
| delay_s = 0.5 | |
| if audio_file and os.path.exists(audio_file): | |
| audio = AudioSegment.from_file(audio_file) | |
| duration_ms = len(audio) | |
| delay_s = max(0.5, (duration_ms / 1000.0) - 3.0) # Start 3 seconds before end | |
| print(f"β±οΈ Voice request audio: {duration_ms}ms, YouTube delay: {delay_s:.1f}s") | |
| # Create the actual player HTML | |
| actual_player_html = create_youtube_player_html( | |
| youtube_id, | |
| track.get('title', 'Unknown'), | |
| track.get('artist', 'Unknown'), | |
| track['url'] | |
| ) | |
| # Store for timer callback | |
| radio_state["pending_player_html"] = actual_player_html | |
| # Create placeholder | |
| music_player_html = create_placeholder_html( | |
| track.get('title', 'Unknown'), | |
| track.get('artist', 'Unknown'), | |
| delay_s, | |
| "music" | |
| ) | |
| # Activate timer | |
| timer_config = gr.Timer(value=delay_s, active=True) | |
| print(f"β Voice request: Player scheduled in {delay_s:.1f}s") | |
| except Exception as e: | |
| print(f"β οΈ Error calculating delay for voice request: {e}") | |
| # Fallback: immediate play | |
| music_player_html = create_youtube_player_html( | |
| youtube_id, | |
| track.get('title', 'Unknown'), | |
| track.get('artist', 'Unknown'), | |
| track['url'] | |
| ) | |
| else: | |
| music_player_html = f""" | |
| <div style="padding: 1rem; background: #f0f0f0; border-radius: 10px; margin: 1rem 0;"> | |
| <h4>π΅ {track.get('title', 'Unknown')} - {track.get('artist', 'Unknown')}</h4> | |
| <a href="{track['url']}" target="_blank" style="display: inline-block; padding: 0.5rem 1rem; background: #ff0000; color: white; text-decoration: none; border-radius: 5px;"> | |
| βΆοΈ Play on YouTube | |
| </a> | |
| </div> | |
| """ | |
| elif track.get("source") == "soundcloud": | |
| # SoundCloud embed with simple iframe (no script for volume control) | |
| music_player_html = f""" | |
| <div style="padding: 1rem; background: linear-gradient(135deg, #ff5500 0%, #ff8800 100%); border-radius: 12px; margin: 1rem 0; box-shadow: 0 4px 15px rgba(0,0,0,0.3);"> | |
| <h4 style="margin: 0 0 0.75rem 0; color: #fff; font-size: 1.1em;">π΅ {track.get('title', 'Unknown')}</h4> | |
| <p style="margin: 0 0 0.75rem 0; color: #ffe0cc; font-size: 0.9em;">by {track.get('artist', 'Unknown')}</p> | |
| <iframe width="100%" height="166" scrolling="no" frameborder="no" | |
| style="border-radius: 8px;" | |
| src="https://w.soundcloud.com/player/?url={track['url']}&color=%23ffffff&auto_play=true&hide_related=true&show_comments=false&show_user=true&show_reposts=false&show_teaser=false"> | |
| </iframe> | |
| <p style="margin-top: 0.75rem; font-size: 0.85em; margin-bottom: 0;"> | |
| <a href="{track['url']}" target="_blank" style="color: #fff; text-decoration: none;">π Open on SoundCloud</a> | |
| </p> | |
| </div> | |
| """ | |
| # Note: Gradio sanitizes script tags, so autoplay scripts don't work | |
| # The browser may block autoplay - user may need to click play | |
| # Return with player | |
| return f"β {host_response}\n\nπ΅ Playing: {track['title']} by {track['artist']}", audio_file, music_player_html, timer_config | |
| except Exception as e: | |
| return f"β Error processing voice request: {e}", None, "", gr.Timer(value=0, active=False) | |
| def get_now_playing(segment: Dict[str, Any]) -> str: | |
| """Get now playing text""" | |
| seg_type = segment["type"] | |
| if seg_type == "music": | |
| track = segment.get("track", {}) | |
| if track: | |
| return f"βͺ {track['title']} - {track['artist']}" | |
| elif seg_type == "news": | |
| return "π° News Update" | |
| elif seg_type == "podcast": | |
| podcast = segment.get("podcast", {}) | |
| if podcast: | |
| return f"ποΈ {podcast['title']}" | |
| elif seg_type == "story": | |
| return "π Story Time" | |
| elif seg_type == "intro": | |
| return "ποΈ Welcome!" | |
| elif seg_type == "outro": | |
| return "π Goodbye!" | |
| return "π» AI Radio" | |
| def init_user(stored_user_id: str = None) -> tuple: | |
| """Initialize user from stored ID or create new one | |
| Args: | |
| stored_user_id: User ID from browser localStorage/cookie | |
| Returns: | |
| Tuple of (user_id, passphrase, is_new_user) | |
| """ | |
| user_id, passphrase, is_new = user_memory.get_or_create_user(stored_user_id) | |
| radio_state["user_id"] = user_id | |
| # Load user preferences if they exist | |
| saved_prefs = user_memory.get_user_preferences(user_id) | |
| if saved_prefs: | |
| radio_state["user_preferences"] = saved_prefs | |
| return user_id, passphrase, is_new | |
| def authenticate_with_passphrase(passphrase: str) -> tuple: | |
| """Authenticate user with passphrase | |
| Args: | |
| passphrase: User's passphrase (e.g. "swift-tiger-aurora") | |
| Returns: | |
| Tuple of (success_message, user_id, passphrase) | |
| """ | |
| if not passphrase or not passphrase.strip(): | |
| return "β οΈ Please enter your passphrase", "", "" | |
| passphrase = passphrase.strip().lower() | |
| user_id = user_memory.get_user_by_passphrase(passphrase) | |
| if user_id: | |
| radio_state["user_id"] = user_id | |
| # Load user preferences | |
| saved_prefs = user_memory.get_user_preferences(user_id) | |
| if saved_prefs: | |
| radio_state["user_preferences"] = saved_prefs | |
| stats = user_memory.get_user_stats(user_id) | |
| return f"β Welcome back! Logged in successfully.\n\nπ Your stats:\nβ’ Liked tracks: {stats.get('total_likes', 0)}\nβ’ Play history: {stats.get('total_plays', 0)} tracks", user_id, passphrase | |
| else: | |
| return "β Invalid passphrase. Please check and try again.", "", "" | |
| def create_new_account() -> tuple: | |
| """Create a new user account | |
| Returns: | |
| Tuple of (message, user_id, passphrase) | |
| """ | |
| user_id, passphrase, _ = user_memory.get_or_create_user() | |
| radio_state["user_id"] = user_id | |
| message = f"""π Welcome to AI Radio! | |
| Your personal passphrase is: | |
| π **{passphrase}** | |
| β οΈ **Save this passphrase!** You'll need it to access your account from other devices or if cookies are cleared.""" | |
| return message, user_id, passphrase | |
| def like_current_track() -> str: | |
| """Like the currently playing track""" | |
| user_id = radio_state.get("user_id") | |
| track = radio_state.get("current_track") | |
| if not user_id: | |
| return "β οΈ User not initialized" | |
| if not track: | |
| return "β οΈ No track currently playing" | |
| if user_memory.like_track(user_id, track): | |
| return f"π Liked: {track.get('title', 'Unknown')} by {track.get('artist', 'Unknown')}" | |
| return "β Could not save like" | |
| def dislike_current_track() -> str: | |
| """Dislike the currently playing track""" | |
| user_id = radio_state.get("user_id") | |
| track = radio_state.get("current_track") | |
| if not user_id: | |
| return "β οΈ User not initialized" | |
| if not track: | |
| return "β οΈ No track currently playing" | |
| if user_memory.dislike_track(user_id, track): | |
| return f"π Won't play again: {track.get('title', 'Unknown')}" | |
| return "β Could not save dislike" | |
| def get_liked_tracks_display() -> str: | |
| """Get formatted list of liked tracks for display""" | |
| user_id = radio_state.get("user_id") | |
| if not user_id: | |
| return "No liked tracks yet. Start listening and like some songs!" | |
| liked = user_memory.get_liked_tracks(user_id) | |
| if not liked: | |
| return "No liked tracks yet. Like some songs while listening!" | |
| lines = ["### β€οΈ Your Liked Tracks\n"] | |
| for i, track in enumerate(liked[-20:], 1): # Show last 20 | |
| lines.append(f"{i}. **{track.get('title', 'Unknown')}** - {track.get('artist', 'Unknown')}") | |
| if len(liked) > 20: | |
| lines.append(f"\n*...and {len(liked) - 20} more liked tracks*") | |
| return "\n".join(lines) | |
| def get_user_stats_display() -> str: | |
| """Get user stats for display""" | |
| user_id = radio_state.get("user_id") | |
| if not user_id: | |
| return "Start listening to see your stats!" | |
| stats = user_memory.get_user_stats(user_id) | |
| liked_count = len(user_memory.get_liked_tracks(user_id)) | |
| return f"""### π Your Stats | |
| π΅ **Total Tracks Played:** {stats.get('total_plays', 0)} | |
| π **Liked Tracks:** {liked_count} | |
| π **Disliked Tracks:** {stats.get('total_dislikes', 0)} | |
| *Your liked tracks have a 30% chance of playing during music segments!* | |
| """ | |
| def get_stats(): | |
| """Get listening statistics for the current user""" | |
| user_id = radio_state.get("user_id") | |
| if not user_id: | |
| return """π **Your Listening Stats** | |
| *Log in or create an account to track your listening history!* | |
| """ | |
| # Get user-specific stats from user_memory | |
| user_stats = user_memory.get_user_stats(user_id) | |
| liked_tracks = user_memory.get_liked_tracks(user_id) | |
| disliked_tracks = user_memory.get_disliked_tracks(user_id) | |
| play_history = user_memory.get_play_history(user_id) | |
| # Count by type from play history | |
| music_count = sum(1 for t in play_history if t.get('source') in ['youtube', 'soundcloud']) | |
| return f"""π **Your Listening Stats** | |
| π€ **User ID:** {user_id} | |
| π΅ **Total Tracks Played:** {user_stats.get('total_plays', 0)} | |
| π **Liked Tracks:** {len(liked_tracks)} | |
| π **Disliked Tracks:** {len(disliked_tracks)} | |
| π **Recent History:** {len(play_history)} tracks | |
| *Your liked tracks have a 30% chance of playing during music segments!* | |
| *Disliked tracks will be skipped automatically.* | |
| """ | |
| # Custom CSS for beautiful radio station UI | |
| custom_css = """ | |
| #radio-header { | |
| text-align: center; | |
| background: linear-gradient(135deg, #667eea 0%, #764ba2 100%); | |
| color: white; | |
| padding: 2rem; | |
| border-radius: 10px; | |
| margin-bottom: 1rem; | |
| } | |
| #now-playing { | |
| background: linear-gradient(135deg, #f093fb 0%, #f5576c 100%); | |
| color: white; | |
| padding: 1.5rem; | |
| border-radius: 10px; | |
| text-align: center; | |
| font-size: 1.2em; | |
| font-weight: bold; | |
| margin: 1rem 0; | |
| } | |
| #segment-display { | |
| background: #f8f9fa; | |
| padding: 2rem; | |
| border-radius: 10px; | |
| min-height: 200px; | |
| border-left: 4px solid #667eea; | |
| } | |
| .control-button { | |
| font-size: 1.1em !important; | |
| padding: 0.8rem !important; | |
| } | |
| #stats-panel { | |
| background: linear-gradient(135deg, #84fab0 0%, #8fd3f4 100%); | |
| padding: 1.5rem; | |
| border-radius: 10px; | |
| color: #333; | |
| } | |
| """ | |
| # Auto-play JavaScript (injected in head) | |
| auto_play_head = """ | |
| <script> | |
| (function() { | |
| let autoPlayEnabled = true; | |
| let nextSegmentTimeout = null; | |
| let currentAudioElement = null; | |
| let musicPlayDuration = 60000; // Wait 60 seconds for music to play | |
| function clickNextSegment() { | |
| const buttons = document.querySelectorAll('button'); | |
| for (let btn of buttons) { | |
| if (btn.textContent.includes('Next Segment')) { | |
| console.log('βοΈ Auto-clicking Next Segment button...'); | |
| btn.click(); | |
| return true; | |
| } | |
| } | |
| return false; | |
| } | |
| function onAudioEnded() { | |
| if (!autoPlayEnabled) return; | |
| console.log('π΅ Host audio ended, scheduling next segment in ' + (musicPlayDuration/1000) + 's...'); | |
| // Clear any existing timeout | |
| if (nextSegmentTimeout) clearTimeout(nextSegmentTimeout); | |
| // Wait for music/podcast to play, then auto-advance | |
| nextSegmentTimeout = setTimeout(function() { | |
| clickNextSegment(); | |
| }, musicPlayDuration); | |
| } | |
| function setupAutoPlay() { | |
| // Find all audio elements in the host_audio container | |
| const audioContainer = document.querySelector('#host_audio'); | |
| if (!audioContainer) { | |
| return; | |
| } | |
| const audio = audioContainer.querySelector('audio'); | |
| if (!audio) { | |
| return; | |
| } | |
| // If this is a different audio element than before, set up new listener | |
| if (audio !== currentAudioElement) { | |
| console.log('π New audio element detected, setting up auto-play...'); | |
| currentAudioElement = audio; | |
| // Remove old listener if exists (defensive) | |
| audio.removeEventListener('ended', onAudioEnded); | |
| // Add new listener | |
| audio.addEventListener('ended', onAudioEnded); | |
| console.log('β Auto-play listener attached to audio element'); | |
| } | |
| } | |
| // Set up on load | |
| if (document.readyState === 'loading') { | |
| document.addEventListener('DOMContentLoaded', function() { | |
| setupAutoPlay(); | |
| }); | |
| } else { | |
| setupAutoPlay(); | |
| } | |
| // Re-setup when DOM changes (Gradio updates audio element) | |
| const observer = new MutationObserver(function(mutations) { | |
| // Check if audio-related changes | |
| for (let mutation of mutations) { | |
| if (mutation.type === 'childList') { | |
| // Look for audio element changes | |
| const hasAudioChange = Array.from(mutation.addedNodes).some(node => | |
| node.nodeType === 1 && (node.tagName === 'AUDIO' || node.querySelector && node.querySelector('audio')) | |
| ); | |
| if (hasAudioChange || mutation.target.id === 'host_audio' || | |
| (mutation.target.closest && mutation.target.closest('#host_audio'))) { | |
| setupAutoPlay(); | |
| break; | |
| } | |
| } | |
| } | |
| }); | |
| // Start observing after a short delay to let Gradio initialize | |
| setTimeout(function() { | |
| observer.observe(document.body, { childList: true, subtree: true }); | |
| setupAutoPlay(); // Initial setup | |
| // Also periodically check (backup) | |
| setInterval(setupAutoPlay, 5000); | |
| }, 2000); | |
| // Expose functions globally | |
| window.toggleAutoPlay = function(enabled) { | |
| autoPlayEnabled = enabled; | |
| console.log('Auto-play ' + (enabled ? 'enabled' : 'disabled')); | |
| }; | |
| window.setMusicPlayDuration = function(ms) { | |
| musicPlayDuration = ms; | |
| console.log('Music play duration set to ' + (ms/1000) + 's'); | |
| }; | |
| window.cancelNextSegment = function() { | |
| if (nextSegmentTimeout) { | |
| clearTimeout(nextSegmentTimeout); | |
| nextSegmentTimeout = null; | |
| console.log('βΉοΈ Next segment cancelled'); | |
| } | |
| }; | |
| window.playNextNow = function() { | |
| if (nextSegmentTimeout) clearTimeout(nextSegmentTimeout); | |
| clickNextSegment(); | |
| }; | |
| })(); | |
| </script> | |
| """ | |
| # Build Gradio Interface | |
| with gr.Blocks(css=custom_css, title="AI Radio π΅", theme=gr.themes.Soft(), head=auto_play_head) as demo: | |
| # Hidden state for user ID (persisted via localStorage) | |
| user_id_state = gr.State(value=None) | |
| # Header | |
| gr.HTML(""" | |
| <div id="radio-header"> | |
| <h1>π΅ AI Radio Station π΅</h1> | |
| <p style="font-size: 1.2em; margin: 0;">Your Personal AI-Powered Radio Experience</p> | |
| <p style="margin: 0.5rem 0 0 0; opacity: 0.9;">Personalized Music β’ News β’ Podcasts β’ Stories</p> | |
| </div> | |
| """) | |
| with gr.Tabs() as tabs: | |
| # Tab 0: Account/Auth | |
| with gr.Tab("π Account", id=0): | |
| gr.Markdown("### π§ Welcome to AI Radio") | |
| # Hidden state for auth | |
| auth_user_id = gr.Textbox(visible=False, elem_id="auth_user_id") | |
| auth_passphrase = gr.Textbox(visible=False, elem_id="auth_passphrase") | |
| with gr.Group(): | |
| auth_status = gr.Markdown( | |
| value="*Loading your account...*", | |
| elem_id="auth_status" | |
| ) | |
| # Current passphrase display (shown after login/signup) | |
| passphrase_display = gr.Textbox( | |
| label="π Your Passphrase (save this!)", | |
| value="", | |
| interactive=False, | |
| visible=True, | |
| elem_id="passphrase_display" | |
| ) | |
| gr.Markdown("---") | |
| gr.Markdown("### π Login with Passphrase") | |
| gr.Markdown("*Enter your passphrase to access your saved preferences, liked tracks, and history from any device.*") | |
| with gr.Row(): | |
| passphrase_input = gr.Textbox( | |
| label="Enter Passphrase", | |
| placeholder="e.g., swift-tiger-aurora", | |
| scale=3 | |
| ) | |
| login_btn = gr.Button("π Login", variant="primary", scale=1) | |
| login_status = gr.Textbox(label="Login Status", interactive=False, visible=True) | |
| gr.Markdown("---") | |
| gr.Markdown("### π New User?") | |
| gr.Markdown("*Click below to create a new account. You'll receive a unique passphrase to remember.*") | |
| create_account_btn = gr.Button("β¨ Create New Account", variant="secondary") | |
| gr.Markdown("---") | |
| gr.Markdown(""" | |
| **π‘ How it works:** | |
| - If you are here for the first time, create a new account | |
| - Use the same passphrase on any device to access your account | |
| - Just enter your passphrase to log back in | |
| """) | |
| # Tab 1: Preferences | |
| with gr.Tab("βοΈ Your Preferences", id=1): | |
| gr.Markdown("### π― Personalize Your Radio Experience") | |
| gr.Markdown("Tell us about your preferences so we can create the perfect radio show for you!") | |
| with gr.Row(): | |
| with gr.Column(): | |
| name_input = gr.Textbox( | |
| label="π€ Your Name", | |
| placeholder="Enter your name", | |
| value="Friend" | |
| ) | |
| mood_input = gr.Dropdown( | |
| label="π Current Mood", | |
| choices=["happy", "energetic", "calm", "focused", "relaxed"], | |
| value="happy" | |
| ) | |
| # Add voice selection dropdown | |
| from tts_service import VOICE_OPTIONS | |
| voice_input = gr.Dropdown( | |
| label="π€ Host Voice", | |
| choices=list(VOICE_OPTIONS.keys()), | |
| value="Rachel (Female)", | |
| info="Choose the voice for your radio host" | |
| ) | |
| with gr.Column(): | |
| genres_input = gr.Dropdown( | |
| label="π΅ Favorite Music Genres", | |
| choices=["pop", "rock", "jazz", "classical", "electronic", "country", "indie", "rap", "blues", "folk", "hip-hop"], | |
| multiselect=True, | |
| value=["rap"] | |
| ) | |
| interests_input = gr.Dropdown( | |
| label="π° News Interests", | |
| choices=["technology", "world", "business", "entertainment", "science", "sports"], | |
| multiselect=True, | |
| value=["technology", "world"] | |
| ) | |
| podcast_input = gr.Dropdown( | |
| label="ποΈ Podcast Interests", | |
| choices=["technology", "business", "comedy", "education", "news", "true-crime"], | |
| multiselect=True, | |
| value=["technology"] | |
| ) | |
| gr.Markdown("### ποΈ Content Filter") | |
| gr.Markdown("Choose what types of content you want to hear:") | |
| with gr.Row(): | |
| music_filter = gr.Checkbox(label="π΅ Music", value=True) | |
| news_filter = gr.Checkbox(label="π° News", value=True) | |
| podcast_filter = gr.Checkbox(label="ποΈ Podcasts", value=True) | |
| story_filter = gr.Checkbox(label="π AI Stories", value=True) | |
| save_pref_btn = gr.Button("πΎ Save & Go to Radio βΆοΈ", variant="primary", size="lg") | |
| pref_status = gr.Textbox(label="Status", interactive=False) | |
| # Tab 2: Radio Player | |
| with gr.Tab("π» Radio Player", id=2): | |
| gr.Markdown("### π§ Start your personalized radio experience!") | |
| now_playing = gr.HTML("<div id='now-playing'>π» Ready to start</div>") | |
| with gr.Row(): | |
| start_btn = gr.Button("βΆοΈ Generate & Play", variant="primary", size="lg", elem_classes="control-button") | |
| next_btn = gr.Button("βοΈ Next Segment", variant="secondary", size="lg", elem_classes="control-button") | |
| stop_btn = gr.Button("βΉοΈ Stop", variant="stop", size="lg", elem_classes="control-button") | |
| with gr.Row(): | |
| voice_audio = gr.Audio( | |
| label="π€ Record Your Song Request", | |
| type="filepath", | |
| sources=["microphone"], | |
| format="wav" | |
| ) | |
| voice_btn = gr.Button("π€ Ask For Song", variant="primary", size="lg", elem_classes="control-button") | |
| voice_status = gr.Textbox(label="Voice Request Status", value="Record your voice request above, then click the button", interactive=False) | |
| # Like/Dislike buttons for current track | |
| with gr.Row(): | |
| like_btn = gr.Button("π Like This Song", variant="secondary", size="sm") | |
| dislike_btn = gr.Button("π Don't Play Again", variant="secondary", size="sm") | |
| like_status = gr.Textbox(label="Rating Status", value="Rate the current song!", interactive=False, scale=2) | |
| progress_text = gr.Textbox(label="Progress", value="Ready to start", interactive=False, visible=False) | |
| segment_info = gr.Markdown("**Welcome!** Set your preferences and start the radio.", elem_id="segment-display") | |
| llm_script = gr.Textbox( | |
| label="π§ Model Script (LLM answer)", | |
| value="Model-generated script will appear here for each segment.", | |
| interactive=False, | |
| lines=6, | |
| visible=False | |
| ) | |
| gr.Markdown("**π‘ Tip:** Host speech plays first, then music/podcasts will stream automatically!") | |
| audio_output = gr.Audio(label="π Host Speech", autoplay=True, type="filepath", elem_id="host_audio") | |
| music_player = gr.HTML(label="π΅ Music/Podcast Player (streaming)") | |
| # Timer for delayed player loading (start disabled) | |
| player_timer = gr.Timer(value=0, active=False) | |
| player_timer.tick( | |
| fn=show_pending_player, | |
| inputs=[], | |
| outputs=[music_player, player_timer] | |
| ) | |
| status_text = gr.Textbox(label="Status", value="Ready", interactive=False, visible=False) | |
| # Connect buttons | |
| start_btn.click( | |
| fn=start_and_play_first_segment, | |
| inputs=[], | |
| outputs=[status_text, segment_info, audio_output, music_player, progress_text, now_playing, llm_script, player_timer] | |
| ) | |
| next_btn.click( | |
| fn=play_next_segment, | |
| inputs=[], | |
| outputs=[segment_info, audio_output, music_player, progress_text, now_playing, llm_script, player_timer] | |
| ) | |
| stop_btn.click( | |
| fn=stop_radio, | |
| inputs=[], | |
| outputs=[status_text, audio_output, music_player, progress_text, now_playing, player_timer], | |
| js="() => { if(window.cancelNextSegment) window.cancelNextSegment(); }" | |
| ) | |
| # Voice input button - process recorded audio | |
| voice_btn.click( | |
| fn=handle_voice_request, | |
| inputs=[voice_audio], | |
| outputs=[voice_status, audio_output, music_player, player_timer] | |
| ) | |
| # Like/Dislike buttons | |
| like_btn.click( | |
| fn=like_current_track, | |
| inputs=[], | |
| outputs=[like_status] | |
| ) | |
| dislike_btn.click( | |
| fn=dislike_current_track, | |
| inputs=[], | |
| outputs=[like_status] | |
| ) | |
| # Connect save preferences button (needs to be after both tabs are defined) | |
| save_pref_btn.click( | |
| fn=save_preferences, | |
| inputs=[name_input, genres_input, interests_input, podcast_input, mood_input, | |
| music_filter, news_filter, podcast_filter, story_filter, voice_input], | |
| outputs=[pref_status] | |
| ).then( | |
| fn=lambda: gr.Tabs(selected=2), # Go to Radio Player tab (id=2) | |
| outputs=[tabs] | |
| ) | |
| # Auth button handlers | |
| def handle_login(passphrase): | |
| """Handle login with passphrase - returns all data including preferences""" | |
| message, user_id, pp = authenticate_with_passphrase(passphrase) | |
| if user_id: | |
| # Get preferences to autofill | |
| prefs = get_saved_preferences() | |
| # Returns: message, user_id, passphrase, passphrase_display, auth_status_md, | |
| # name, genres, interests, podcasts, mood, music, news, podcast, story, voice | |
| return (message, user_id, pp, pp, f"β Logged in! Passphrase: **{pp}**", | |
| prefs[0], prefs[1], prefs[2], prefs[3], prefs[4], | |
| prefs[5], prefs[6], prefs[7], prefs[8], prefs[9]) | |
| # Failed login - return empty/default values | |
| return (message, "", "", "", "*Login failed*", | |
| "Friend", ["pop", "rock"], ["technology", "world"], ["technology"], "happy", | |
| True, True, True, True, "Rachel (Female)") | |
| def handle_create_account(): | |
| """Handle new account creation""" | |
| message, user_id, passphrase = create_new_account() | |
| # Return with passphrase display | |
| return message, user_id, passphrase, passphrase | |
| def handle_init_from_cookie(stored_user_id): | |
| """Initialize user from cookie-stored user ID - returns preferences too""" | |
| if stored_user_id and stored_user_id.strip(): | |
| user_id, passphrase, is_new = init_user(stored_user_id) | |
| prefs = get_saved_preferences() | |
| if is_new: | |
| message = f"""π Welcome to AI Radio! | |
| Your personal passphrase is: | |
| π **{passphrase}** | |
| β οΈ **Save this passphrase!** You'll need it to access your account from other devices.""" | |
| else: | |
| stats = user_memory.get_user_stats(user_id) | |
| message = f"""β Welcome back! | |
| Your passphrase: **{passphrase}** | |
| π Your stats: | |
| β’ Liked tracks: {stats.get('total_likes', 0)} | |
| β’ Play history: {stats.get('total_plays', 0)} tracks""" | |
| # Return: message, user_id, passphrase, passphrase_display, | |
| # name, genres, interests, podcasts, mood, music, news, podcast, story, voice | |
| return (message, user_id, passphrase, passphrase, | |
| prefs[0], prefs[1], prefs[2], prefs[3], prefs[4], | |
| prefs[5], prefs[6], prefs[7], prefs[8], prefs[9]) | |
| else: | |
| # No cookie - return defaults | |
| return ("*No saved account found. Create a new account or log in with your passphrase.*", | |
| "", "", "", | |
| "Friend", ["pop", "rock"], ["technology", "world"], ["technology"], "happy", | |
| True, True, True, True, "Rachel (Female)") | |
| # Login button - single handler that returns everything | |
| login_btn.click( | |
| fn=handle_login, | |
| inputs=[passphrase_input], | |
| outputs=[login_status, auth_user_id, auth_passphrase, passphrase_display, auth_status, | |
| name_input, genres_input, interests_input, podcast_input, mood_input, | |
| music_filter, news_filter, podcast_filter, story_filter, voice_input] | |
| ).then( | |
| # Save to localStorage via JS | |
| fn=None, | |
| inputs=[auth_user_id, auth_passphrase], | |
| outputs=[], | |
| js="(userId, passphrase) => { if(userId && passphrase) { localStorage.setItem('ai_radio_user_id', userId); localStorage.setItem('ai_radio_passphrase', passphrase); } }" | |
| ).then( | |
| # Go to Radio Player tab after successful login | |
| fn=lambda uid: gr.Tabs(selected=2) if uid else gr.Tabs(selected=0), | |
| inputs=[auth_user_id], | |
| outputs=[tabs] | |
| ) | |
| # Create account button | |
| create_account_btn.click( | |
| fn=handle_create_account, | |
| inputs=[], | |
| outputs=[auth_status, auth_user_id, auth_passphrase, passphrase_display] | |
| ).then( | |
| # Save to localStorage via JS | |
| fn=None, | |
| inputs=[auth_user_id, auth_passphrase], | |
| outputs=[], | |
| js="(userId, passphrase) => { localStorage.setItem('ai_radio_user_id', userId); localStorage.setItem('ai_radio_passphrase', passphrase); }" | |
| ).then( | |
| # Go to preferences tab | |
| fn=lambda: gr.Tabs(selected=1), | |
| outputs=[tabs] | |
| ) | |
| # Initialize from localStorage on page load | |
| demo.load( | |
| fn=None, | |
| inputs=[], | |
| outputs=[auth_user_id], | |
| js="() => { return localStorage.getItem('ai_radio_user_id') || ''; }" | |
| ).then( | |
| fn=handle_init_from_cookie, | |
| inputs=[auth_user_id], | |
| outputs=[auth_status, auth_user_id, auth_passphrase, passphrase_display, | |
| name_input, genres_input, interests_input, podcast_input, mood_input, | |
| music_filter, news_filter, podcast_filter, story_filter, voice_input] | |
| ).then( | |
| # Save to localStorage (in case new user was created) | |
| fn=None, | |
| inputs=[auth_user_id, auth_passphrase], | |
| outputs=[], | |
| js="(userId, passphrase) => { if(userId && passphrase) { localStorage.setItem('ai_radio_user_id', userId); localStorage.setItem('ai_radio_passphrase', passphrase); } }" | |
| ) | |
| # Tab 3: Liked Tracks | |
| with gr.Tab("β€οΈ Liked Tracks", id=3): | |
| gr.Markdown("### β€οΈ Your Liked Tracks") | |
| gr.Markdown("Songs you've liked will be played again sometimes! (30% chance during music segments)") | |
| liked_display = gr.Markdown("No liked tracks yet. Like some songs while listening!") | |
| user_stats_display = gr.Markdown("") | |
| refresh_liked_btn = gr.Button("π Refresh Liked Tracks", variant="secondary") | |
| refresh_liked_btn.click( | |
| fn=get_liked_tracks_display, | |
| inputs=[], | |
| outputs=[liked_display] | |
| ).then( | |
| fn=get_user_stats_display, | |
| inputs=[], | |
| outputs=[user_stats_display] | |
| ) | |
| # Tab 4: Statistics | |
| with gr.Tab("π Your Stats", id=4): | |
| gr.Markdown("### π Your Listening Statistics") | |
| stats_display = gr.Markdown(elem_id="stats-panel") | |
| refresh_stats_btn = gr.Button("π Refresh Stats", variant="secondary") | |
| refresh_stats_btn.click( | |
| fn=get_stats, | |
| inputs=[], | |
| outputs=[stats_display] | |
| ) | |
| # Load stats on tab open | |
| demo.load( | |
| fn=get_stats, | |
| inputs=[], | |
| outputs=[stats_display] | |
| ) | |
| # Tab 5: About | |
| with gr.Tab("βΉοΈ About", id=5): | |
| gr.Markdown(""" | |
| # π΅ AI Radio - Your Personal Radio Station | |
| ## About This App | |
| AI Radio is an intelligent, personalized radio station powered by cutting-edge AI technology. | |
| It creates a unique listening experience tailored to your preferences, mood, and interests. | |
| ## π Features | |
| - **π΅ Personalized Music**: Curated tracks from YouTube based on your favorite genres and mood | |
| - **π° Custom News**: Real-time news updates on topics you care about | |
| - **ποΈ Podcast Recommendations**: Discover interesting podcasts matching your interests | |
| - **π AI-Generated Stories**: Entertaining stories and fun facts | |
| - **π€ AI Host**: Dynamic AI radio host that introduces segments (choose your preferred voice!) | |
| - **πΎ Smart Recommendations**: RAG system learns from your listening history | |
| - **π User Accounts**: Passphrase-based authentication with saved preferences | |
| ## π§ How It Works | |
| ### MCP Servers (Modular Tools) | |
| Three specialized **MCP (Model Context Protocol)** servers act as tools: | |
| - **Music Server**: Searches YouTube for music tracks matching your preferences | |
| - **News Server**: Fetches real-time news from RSS feeds | |
| - **Podcast Server**: Discovers podcasts on YouTube | |
| ### LLM (Large Language Model) | |
| **Nebius GPT-OSS-120B** generates all text content: | |
| - Personalized host commentary and introductions | |
| - Conversational news scripts | |
| - Entertaining stories and fun facts | |
| - All adapted to your mood and preferences | |
| ### RAG System (Retrieval-Augmented Generation) | |
| **LlamaIndex-powered RAG** provides context-aware personalization: | |
| - Stores your preferences and listening history | |
| - Retrieves context for better recommendations | |
| - Learns from your behavior to improve suggestions over time | |
| ## π οΈ Technology Stack | |
| - **Gradio**: Beautiful, interactive UI | |
| - **Nebius GPT-OSS-120B** (OpenAI-compatible): LLM for content generation | |
| - **ElevenLabs**: High-quality text-to-speech for voice generation | |
| - **LlamaIndex**: RAG system for personalized recommendations | |
| - **MCP Servers**: Modular tools for music, news, and podcasts | |
| - **yt-dlp**: YouTube music and podcast search | |
| - **YouTube/SoundCloud**: Music and podcast streaming | |
| ## π Built for MCP 1st Birthday Competition | |
| This app demonstrates: | |
| - β **Autonomous Agent Behavior**: Planning, reasoning, and execution | |
| - β **MCP Servers as Tools**: Modular music, news, and podcast servers | |
| - β **RAG System**: Context-aware personalization with LlamaIndex | |
| - β **LLM Integration**: Content generation with Nebius GPT-OSS-120B | |
| - β **Gradio Interface**: Seamless user experience | |
| ## π How to Use | |
| 1. **Create Account**: Get your unique passphrase (saved in browser) | |
| 2. **Set Preferences**: Choose genres, interests, and mood | |
| 3. **Start Radio**: Click "Generate & Play" to begin your personalized show | |
| 4. **Interact**: Like/dislike tracks, request songs by voice | |
| 5. **Track Stats**: View your listening history and statistics | |
| --- | |
| Made with β€οΈ for the MCP 1st Birthday Competition | |
| """) | |
| # Footer | |
| gr.HTML(""" | |
| <div style="text-align: center; padding: 2rem; margin-top: 2rem; border-top: 1px solid #ddd;"> | |
| <p style="color: #666;">π΅ AI Radio - Personalized Radio for Everyone π΅</p> | |
| <p style="color: #999; font-size: 0.9em;">Track: MCP in Action - Consumer Applications</p> | |
| </div> | |
| """) | |
| # Launch the app | |
| # Note: For HuggingFace Spaces, use app.py at root level instead | |
| if __name__ == "__main__": | |
| import os | |
| port = int(os.getenv("PORT", 7871)) | |
| demo.launch( | |
| server_name="0.0.0.0", | |
| server_port=port, | |
| share=False | |
| ) | |