"""AI Radio - Personalized Radio Station with AI Host""" import gradio as gr import os import json import time import re from typing import Dict, Any, List, Optional import threading from pydub import AudioSegment # Get project root directory (parent of src/) PROJECT_ROOT = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) AUDIO_DIR = os.path.join(PROJECT_ROOT, "audio") LOG_DIR = os.path.join(PROJECT_ROOT, "logs") os.makedirs(AUDIO_DIR, exist_ok=True) os.makedirs(LOG_DIR, exist_ok=True) from config import get_config from radio_agent import RadioAgent from tts_service import TTSService from rag_system import RadioRAGSystem from voice_input import VoiceInputService from user_memory import UserMemoryService def create_youtube_player_html(youtube_id: str, title: str, artist: str, url: str) -> str: """Create YouTube player iframe HTML""" return f""" """ def create_podcast_player_html(youtube_id: str, title: str, host: str, url: str) -> str: """Create podcast YouTube player iframe HTML""" return f""" """ def create_placeholder_html(title: str, artist: str, delay_s: float, media_type: str = "music") -> str: """Create placeholder HTML shown while waiting for host speech to end""" bg_color = "#1a1a2e" if media_type == "music" else "#6b46c1" icon = "🎡" if media_type == "music" else "πŸŽ™οΈ" return f"""

{icon} {title}

by {artist}

⏳

Video starts in {delay_s:.0f}s...

After host finishes speaking

""" def get_pending_player(): """Check if there's a pending player ready to show""" pending = radio_state.get("pending_player") if pending and time.time() >= pending.get("ready_at", 0): html = pending.get("html", "") radio_state["pending_player"] = None # Clear it return html return None # Global state radio_state = { "is_playing": False, "is_stopped": False, # User explicitly stopped (vs just paused) "current_segment_index": 0, "planned_show": [], "user_preferences": {}, "stop_flag": False, "current_news_batches": [], "news_total_batches": 0, "news_batches_played": 0, "last_segment": None, "current_track": None, # Currently playing track for like/dislike "user_id": None, # Current user ID "auto_play": True, # Auto-play next segment when current finishes "content_filter": { "music": True, "news": True, "podcasts": True, "stories": True }, # Pending media player (to be added after host speech ends) "pending_player": None, # {"html": "...", "ready_at": timestamp} } # Initialize services config = get_config() agent = RadioAgent(config) # Use open-source local TTS instead of ElevenLabs tts_service = TTSService(api_key=getattr(config, "elevenlabs_api_key", ""), voice_id=getattr(config, "elevenlabs_voice_id", "")) voice_input_service = VoiceInputService() user_memory = UserMemoryService() def save_preferences(name: str, favorite_genres: List[str], interests: List[str], podcast_interests: List[str], mood: str, music_filter: bool, news_filter: bool, podcast_filter: bool, story_filter: bool, voice_name: str = None) -> str: """Save user preferences to RAG system""" # Initialize user if not already done if not radio_state.get("user_id"): user_id, _, _ = user_memory.get_or_create_user() radio_state["user_id"] = user_id print(f"πŸ‘€ User initialized: {radio_state['user_id']}") # Map voice name to voice_id if needed from tts_service import VOICE_OPTIONS selected_voice_id = "21m00Tcm4TlvDq8ikWAM" # Default Rachel if voice_name and voice_name in VOICE_OPTIONS: selected_voice_id = VOICE_OPTIONS[voice_name] preferences = { "name": name or "Friend", "favorite_genres": favorite_genres or ["pop"], "interests": interests or ["world"], "podcast_interests": podcast_interests or ["technology"], "mood": mood or "happy", "voice_id": selected_voice_id, "voice_name": voice_name or "Rachel (Female)", "content_filter": { "music": music_filter, "news": news_filter, "podcasts": podcast_filter, "stories": story_filter }, "timestamp": time.time() } # Update content filter radio_state["content_filter"] = preferences["content_filter"] radio_state["user_preferences"] = preferences # Update TTS service with new voice global tts_service tts_service.voice_id = selected_voice_id print(f"🎀 Voice updated to: {voice_name} (ID: {selected_voice_id})") # Save to RAG system with user_id user_id = radio_state.get("user_id") agent.rag_system.store_user_preferences(preferences, user_id=user_id) # Save to user memory for persistence if user_id: user_memory.save_user_preferences(user_id, preferences) print(f"πŸ’Ύ Preferences saved for user {user_id}") return f"βœ… Preferences saved! Welcome, {preferences['name']}! Your personalized radio is ready." def get_saved_preferences() -> tuple: """Get saved preferences for the current user to autofill the form Returns: Tuple of (name, genres, interests, podcast_interests, mood, music, news, podcast, story, voice_name) """ user_id = radio_state.get("user_id") if not user_id: # Return defaults return "Friend", ["pop", "rock"], ["technology", "world"], ["technology"], "happy", True, True, True, True, "Rachel (Female)" prefs = user_memory.get_user_preferences(user_id) if not prefs: return "Friend", ["pop", "rock"], ["technology", "world"], ["technology"], "happy", True, True, True, True, "Rachel (Female)" # Extract content filter content_filter = prefs.get("content_filter", {}) # Get voice name (prefer stored voice_name, otherwise map from voice_id) voice_name = prefs.get("voice_name", "Rachel (Female)") if not voice_name or voice_name not in ["Rachel (Female)", "Lera (Female)", "Bella (Female)", "Antoni (Male)", "Arnold (Male)", "Adam (Male)", "Domi (Female)", "Elli (Female)", "Josh (Male)", "Sam (Male)"]: # Try to map voice_id back to name from tts_service import VOICE_OPTIONS voice_id = prefs.get("voice_id", "21m00Tcm4TlvDq8ikWAM") for name, vid in VOICE_OPTIONS.items(): if vid == voice_id: voice_name = name break else: voice_name = "Rachel (Female)" # Default fallback return ( prefs.get("name", "Friend"), prefs.get("favorite_genres", ["pop", "rock"]), prefs.get("interests", ["technology", "world"]), prefs.get("podcast_interests", ["technology"]), prefs.get("mood", "happy"), content_filter.get("music", True), content_filter.get("news", True), content_filter.get("podcasts", True), content_filter.get("stories", True), voice_name ) def start_radio_stream(): """Start the radio stream""" # Use saved preferences, or set defaults if none exist if not radio_state["user_preferences"]: # Try to load from user memory user_id = radio_state.get("user_id") if user_id: saved = user_memory.get_user_preferences(user_id) if saved: radio_state["user_preferences"] = saved radio_state["content_filter"] = saved.get("content_filter", { "music": True, "news": True, "podcasts": True, "stories": True }) # Still no preferences? Use defaults if not radio_state["user_preferences"]: radio_state["user_preferences"] = { "name": "Friend", "favorite_genres": ["pop", "rock"], "interests": ["technology", "world"], "podcast_interests": ["technology"], "mood": "happy", "voice_id": "21m00Tcm4TlvDq8ikWAM", "voice_name": "Rachel (Female)" } radio_state["content_filter"] = { "music": True, "news": True, "podcasts": True, "stories": True } if radio_state["is_playing"]: return "πŸ“» Radio is already playing!", None, None, None, "" # Plan the show with content filter user_id = radio_state.get("user_id") show_plan = agent.plan_radio_show( user_id=user_id, user_preferences=radio_state["user_preferences"], duration_minutes=30, content_filter=radio_state["content_filter"] ) radio_state["planned_show"] = show_plan radio_state["current_segment_index"] = 0 radio_state["is_playing"] = True radio_state["stop_flag"] = False radio_state["current_news_batches"] = [] radio_state["news_total_batches"] = 0 radio_state["news_batches_played"] = 0 radio_state["last_segment"] = None return "🎡 Starting your personalized radio show...", None, None, None, "" def show_pending_player(): """Called by timer to finally show the media player""" player_html = radio_state.get("pending_player_html", "") print("⏰ Timer fired: Showing pending media player now") # Return player HTML and disable timer return player_html, gr.Timer(value=0, active=False) def start_and_play_first_segment(): """ One-shot helper for the UI: - If resuming from stop: continue from next segment - If fresh start: Plan the show and play first segment Returns everything needed to update the UI in a single call. """ print("▢️ [start_and_play_first_segment] Starting...") # Initialize user if not already done if not radio_state.get("user_id"): user_id, _, _ = user_memory.get_or_create_user() radio_state["user_id"] = user_id print(f"πŸ‘€ User initialized: {radio_state['user_id']}") # Use saved preferences, or set defaults if none exist if not radio_state["user_preferences"]: # Try to load from user memory user_id = radio_state.get("user_id") if user_id: saved = user_memory.get_user_preferences(user_id) if saved: radio_state["user_preferences"] = saved radio_state["content_filter"] = saved.get("content_filter", { "music": True, "news": True, "podcasts": True, "stories": True }) # Update TTS service with saved voice voice_id = saved.get("voice_id", "21m00Tcm4TlvDq8ikWAM") tts_service.voice_id = voice_id print(f"πŸ“‚ Loaded saved preferences for user {user_id}") print(f"🎀 Voice set to: {saved.get('voice_name', 'Rachel (Female)')} (ID: {voice_id})") # Still no preferences? Use defaults if not radio_state["user_preferences"]: radio_state["user_preferences"] = { "name": "Friend", "favorite_genres": ["pop", "rock"], "interests": ["technology", "world"], "podcast_interests": ["technology"], "mood": "happy", "voice_id": "21m00Tcm4TlvDq8ikWAM", "voice_name": "Rachel (Female)" } radio_state["content_filter"] = { "music": True, "news": True, "podcasts": True, "stories": True } # Update TTS service with default voice tts_service.voice_id = "21m00Tcm4TlvDq8ikWAM" print("πŸ“‹ Using default preferences") # Check if resuming from stopped state (only if show was manually stopped, not completed) if radio_state.get("is_stopped") and radio_state.get("planned_show") and radio_state["current_segment_index"] < len(radio_state["planned_show"]): print("▢️ Resuming from stopped state...") radio_state["is_playing"] = True radio_state["is_stopped"] = False radio_state["stop_flag"] = False # Play next segment (continuing from where we stopped) segment_info, host_audio, music_html, progress, now_playing, llm_script, timer_config = play_next_segment() return ( "▢️ Resuming radio...", segment_info, host_audio, music_html, progress, now_playing, llm_script, timer_config ) # If show completed or no valid show to resume, start fresh # Also check if is_playing is True but show is actually completed if radio_state.get("is_playing") and radio_state.get("planned_show"): # Check if show is actually completed if radio_state["current_segment_index"] >= len(radio_state["planned_show"]): # Show completed, reset and start fresh print("πŸ”„ Show completed, resetting for new show...") reset_radio() else: # Show is actually playing return ( "πŸ“» Radio is already playing!", "**Welcome!** Set your preferences and start the radio.", None, None, "Ready to start", "πŸ“» AI Radio", "Model-generated script will appear here for each segment.", gr.Timer(value=0, active=False) ) elif radio_state.get("is_playing"): # is_playing is True but no planned show - reset print("πŸ”„ is_playing=True but no planned_show, resetting...") reset_radio() import time t0 = time.time() # Fresh start - reset and plan new show reset_radio() # Step 1: Plan skeleton show (YouTube searches happen here) print(" [1/3] Planning show (searching YouTube)...") user_id = radio_state.get("user_id") show_plan = agent.plan_radio_show( user_id=user_id, user_preferences=radio_state["user_preferences"], duration_minutes=30, content_filter=radio_state["content_filter"] ) print(f" [1/3] Done in {time.time()-t0:.1f}s, segments: {len(show_plan)}") radio_state["planned_show"] = show_plan radio_state["current_segment_index"] = 0 radio_state["is_playing"] = True radio_state["stop_flag"] = False radio_state["is_stopped"] = False # Step 2 & 3: Generate and play first segment (LLM + TTS inside play_next_segment) print(" [2/3] Generating first segment (LLM)...") t1 = time.time() segment_info, host_audio, music_html, progress, now_playing, llm_script, timer_config = play_next_segment() print(f" [2/3] Done in {time.time()-t1:.1f}s") print(f" [3/3] Total time: {time.time()-t0:.1f}s") print(f" Host audio file: {host_audio}") return ( "🎡 Starting your personalized radio show...", segment_info, host_audio, music_html, progress, now_playing, llm_script, timer_config ) def play_next_segment(): """Play the next segment in the show - returns host audio and music audio separately""" # If stopped, resume first if radio_state.get("is_stopped"): print("▢️ Resuming from stopped state in play_next_segment...") radio_state["is_stopped"] = False radio_state["stop_flag"] = False radio_state["is_playing"] = True if not radio_state["is_playing"]: # If we have a planned show, try to resume if radio_state.get("planned_show"): radio_state["is_playing"] = True radio_state["stop_flag"] = False else: return "⏸️ No show planned. Click 'Generate & Play' to start.", None, None, None, "", "", gr.Timer(value=0, active=False) if radio_state["stop_flag"]: radio_state["is_playing"] = False return "⏸️ Radio paused", None, None, None, "", "", gr.Timer(value=0, active=False) # If there are remaining news TTS batches, play the next one without advancing to a new segment if radio_state.get("current_news_batches"): segment = radio_state.get("last_segment") if segment: batch_index = radio_state.get("news_batches_played", 1) + 1 next_batch = radio_state["current_news_batches"].pop(0) radio_state["news_batches_played"] = batch_index host_audio_file = None if getattr(tts_service, "available", True): audio_bytes = tts_service.text_to_speech(next_batch) if audio_bytes: host_audio_file = os.path.join(AUDIO_DIR, f"segment_{radio_state['current_segment_index']}_batch_{batch_index}.mp3") tts_service.save_audio(audio_bytes, host_audio_file) # Build display info (same news segment) segment_info = format_segment_info(segment) raw_script = get_segment_text(segment) script_batches = segment.get("script_batches", segment.get("script", [])) if isinstance(script_batches, list) and script_batches: display_script = "\n\n---\n\n".join(str(b) for b in script_batches) else: display_script = str(raw_script) if raw_script else "" total = radio_state.get("news_total_batches", 1) or 1 progress = f"News batch {batch_index}/{total}" # When no more batches, clear counters so next click advances to next segment if not radio_state["current_news_batches"]: radio_state["news_total_batches"] = 0 radio_state["news_batches_played"] = 0 return segment_info, host_audio_file, "", progress, get_now_playing(segment), display_script, gr.Timer(value=0, active=False) if radio_state["current_segment_index"] >= len(radio_state["planned_show"]): radio_state["is_playing"] = False return "🎊 Show completed! Hope you enjoyed it.", None, None, None, "", "", gr.Timer(value=0, active=False) # Get current segment segment_index = radio_state["current_segment_index"] segment = radio_state["planned_show"][segment_index] radio_state["current_segment_index"] += 1 # Lazily generate LLM content for this segment user_prefs = radio_state["user_preferences"].copy() user_id = radio_state.get("user_id") if user_id: user_prefs["_user_id"] = user_id # Pass user_id in preferences for RAG queries segment = agent.enrich_segment(segment, user_prefs) radio_state["planned_show"][segment_index] = segment # Execute segment (log to RAG) agent.execute_segment(segment, user_id=user_id) radio_state["last_segment"] = segment # Generate content display and underlying script text segment_info = format_segment_info(segment) raw_script = get_segment_text(segment) # Generate TTS for host commentary host_audio_file = None music_player_html = "" if segment["type"] in ["intro", "outro", "story"]: text = raw_script if text and getattr(tts_service, "available", True): audio_bytes = tts_service.text_to_speech(text) if audio_bytes: host_audio_file = os.path.join(AUDIO_DIR, f"segment_{radio_state['current_segment_index']}.mp3") tts_service.save_audio(audio_bytes, host_audio_file) elif segment["type"] == "news": # Handle batched news generation script_batches = segment.get("script_batches", segment.get("script", [])) if isinstance(script_batches, list) and script_batches: # Generate first batch immediately (user doesn't wait) first_batch = script_batches[0] if getattr(tts_service, "available", True): audio_bytes = tts_service.text_to_speech(first_batch) if audio_bytes: host_audio_file = os.path.join(AUDIO_DIR, f"segment_{radio_state['current_segment_index']}_batch_1.mp3") tts_service.save_audio(audio_bytes, host_audio_file) # Store remaining batches for sequential playback segment["remaining_batches"] = script_batches[1:] if len(script_batches) > 1 else [] else: # Fallback for old format text = raw_script if text and getattr(tts_service, "available", True): audio_bytes = tts_service.text_to_speech(text) if audio_bytes: host_audio_file = f"segment_{radio_state['current_segment_index']}.mp3" tts_service.save_audio(audio_bytes, host_audio_file) elif segment["type"] == "music": # For music: generate host commentary, then get music commentary = segment.get("commentary", "") # Generate host commentary if commentary and getattr(tts_service, "available", True): audio_bytes = tts_service.text_to_speech(commentary) if audio_bytes: host_audio_file = os.path.join(AUDIO_DIR, f"segment_{radio_state['current_segment_index']}_host.mp3") tts_service.save_audio(audio_bytes, host_audio_file) # Get music track - check if we should play a liked track track = segment.get("track", {}) user_id = radio_state.get("user_id") # 30% chance to play a liked track if user has any if user_id and user_memory.should_play_liked_track(user_id, probability=0.3): genre = track.get("genre", "") if track else "" liked_track = user_memory.get_random_liked_track(user_id, genre) if liked_track: print(f"❀️ Playing liked track: {liked_track.get('title', 'Unknown')}") track = liked_track segment["track"] = liked_track segment["from_liked"] = True if track: # Store current track for like/dislike buttons radio_state["current_track"] = track # Add to user history if user_id: user_memory.add_to_history(user_id, track) print(f"🎡 Preparing music: {track.get('title', 'Unknown')} by {track.get('artist', 'Unknown')}") # Get streaming URL for YouTube or SoundCloud # Modal proxy returns "youtube_api" as source, so check both if track.get("source") in ["youtube", "youtube_api"] and track.get("url"): try: # Extract YouTube ID youtube_id = track.get("youtube_id", "") if not youtube_id and "v=" in track["url"]: youtube_id = track["url"].split("v=")[-1].split("&")[0] if youtube_id: # # COMMENTED OUT: Delayed loading mechanism # # Get host audio duration to calculate delay # delay_ms = 0 # delay_s = 0.0 # if host_audio_file and os.path.exists(host_audio_file): # try: # audio = AudioSegment.from_file(host_audio_file) # duration_ms = len(audio) # delay_ms = max(0, duration_ms - 3000) # Start 3 seconds before end # delay_s = delay_ms / 1000.0 # print(f"⏱️ Host audio: {duration_ms}ms, YouTube delay: {delay_ms}ms") # except Exception as e: # print(f"⚠️ Could not get audio duration: {e}") # # Store the actual iframe HTML for later (after host speech ends) # actual_player_html = create_youtube_player_html( # youtube_id, # track.get('title', 'Unknown'), # track.get('artist', 'Unknown'), # track['url'] # ) # # Schedule the player to be ready after the delay # radio_state["pending_player"] = { # "html": actual_player_html, # "ready_at": time.time() + delay_s, # "type": "music", # "title": track.get('title', 'Unknown') # } # # Return placeholder initially (no iframe yet!) # music_player_html = create_placeholder_html( # track.get('title', 'Unknown'), # track.get('artist', 'Unknown'), # delay_s, # "music" # ) # print(f"βœ… YouTube player scheduled: {track.get('title', 'Unknown')} (ID: {youtube_id}) - iframe added after {delay_s:.1f}s") # Simple immediate iframe - start track with host music_player_html = create_youtube_player_html( youtube_id, track.get('title', 'Unknown'), track.get('artist', 'Unknown'), track['url'] ) print(f"βœ… YouTube player created immediately: {track.get('title', 'Unknown')} (ID: {youtube_id})") else: # Fallback: Just show link music_player_html = f"""

🎡 {track.get('title', 'Unknown')} - {track.get('artist', 'Unknown')}

Click to listen:

▢️ Play on YouTube
""" except Exception as e: print(f"❌ Error getting music: {e}") # Fallback: Just show YouTube link music_player_html = f"""

🎡 {track.get('title', 'Unknown')} - {track.get('artist', 'Unknown')}

Click to listen:

▢️ Play on YouTube
""" elif track.get("source") == "soundcloud" and track.get("url"): # Simple SoundCloud embed player try: music_player_html = f"""

🎡 {track.get('title', 'Unknown')}

by {track.get('artist', 'Unknown')}

πŸ”— Open on SoundCloud

""" print(f"βœ… Streaming from SoundCloud: {track.get('title', 'Unknown')}") except Exception as e: print(f"❌ Error with SoundCloud: {e}") music_player_html = f"""

🎡 {track.get('title', 'Unknown')} - {track.get('artist', 'Unknown')}

▢️ Play on SoundCloud
""" else: # Demo track - no actual audio file print("ℹ️ Demo track (no audio file available)") music_player_html = "" elif segment["type"] == "podcast": # For podcast: generate intro and create YouTube player intro = segment.get("intro", "") podcast = segment.get("podcast", {}) # Initialize music_player_html for podcast music_player_html = "" if intro and getattr(tts_service, "available", True): audio_bytes = tts_service.text_to_speech(intro) if audio_bytes: host_audio_file = os.path.join(AUDIO_DIR, f"segment_{radio_state['current_segment_index']}_host.mp3") tts_service.save_audio(audio_bytes, host_audio_file) # Create YouTube player for podcast if available # Modal proxy returns "youtube_api" as source, so check both print(f"πŸ” Podcast debug: source={podcast.get('source')}, youtube_id={podcast.get('youtube_id')}, url={podcast.get('url', '')[:50]}") if podcast.get("source") in ["youtube", "youtube_api"] and podcast.get("youtube_id"): youtube_id = podcast.get("youtube_id", "") print(f"βœ… Creating podcast iframe with youtube_id: {youtube_id}") # # COMMENTED OUT: Delayed loading mechanism # # Get host audio duration to calculate delay # delay_ms = 0 # if host_audio_file and os.path.exists(host_audio_file): # try: # audio = AudioSegment.from_file(host_audio_file) # duration_ms = len(audio) # delay_ms = max(0, duration_ms - 3000) # Start 3 seconds before end # print(f"⏱️ Podcast host audio: {duration_ms}ms, YouTube delay: {delay_ms}ms") # except Exception as e: # print(f"⚠️ Could not get audio duration: {e}") # # Store the actual iframe HTML for later (after host speech ends) # delay_s = delay_ms / 1000.0 # actual_player_html = create_podcast_player_html( # youtube_id, # podcast.get('title', 'Podcast'), # podcast.get('host', 'Unknown Host'), # podcast.get('url', '#') # ) # # Schedule the player to be ready after the delay # radio_state["pending_player"] = { # "html": actual_player_html, # "ready_at": time.time() + delay_s, # "type": "podcast", # "title": podcast.get('title', 'Unknown') # } # # Return placeholder initially (no iframe yet!) # music_player_html = create_placeholder_html( # podcast.get('title', 'Podcast'), # podcast.get('host', 'Unknown Host'), # delay_s, # "podcast" # ) # print(f"βœ… Podcast YouTube player scheduled: {podcast.get('title', 'Unknown')} (ID: {youtube_id}) - iframe added after {delay_s:.1f}s") # Simple immediate iframe - start podcast with host music_player_html = create_podcast_player_html( youtube_id, podcast.get('title', 'Podcast'), podcast.get('host', 'Unknown Host'), podcast.get('url', '#') ) print(f"βœ… Podcast YouTube player created immediately: {podcast.get('title', 'Unknown')} (ID: {youtube_id})") print(f"πŸ“¦ Podcast iframe HTML length: {len(music_player_html)} chars, contains iframe: {' str: """Format segment information for display""" seg_type = segment["type"] if seg_type == "intro": return f"πŸŽ™οΈ **Welcome to AI Radio!**\n\n{segment['content']}" elif seg_type == "outro": return f"πŸ‘‹ **Thanks for Listening!**\n\n{segment['content']}" elif seg_type == "music": track = segment.get("track", {}) if track: music_info = f"""🎡 **Now Playing** **{track['title']}** by {track['artist']} Genre: {track['genre']} Duration: {track.get('duration', 'Unknown')}s *{segment.get('commentary', '')}* """ # Add YouTube link if available if segment.get("_youtube_url"): music_info += f"\n\nπŸ”— [Listen on YouTube]({segment['_youtube_url']})" elif track.get("url") and "youtube" in track.get("url", ""): music_info += f"\n\nπŸ”— [Listen on YouTube]({track['url']})" return music_info return "🎡 Music Time!" elif seg_type == "news": news_items = segment.get("news_items", []) news_text = "πŸ“° **News Update**\n\n" # Handle batched script (list) or old format (string) script = segment.get("script", segment.get("script_batches", "")) if isinstance(script, list): # Join all batches for display news_text += " ".join(script) else: # Old format - string news_text += str(script) if script else "" news_text += "\n\n**Headlines:**\n" for item in news_items[:2]: news_text += f"\nβ€’ {item.get('title', 'No title')}" return news_text elif seg_type == "podcast": podcast = segment.get("podcast", {}) if podcast: return f"""πŸŽ™οΈ **Podcast Recommendation** **{podcast['title']}** Hosted by {podcast['host']} {podcast['description']} Duration: {podcast['duration']} Rating: {'⭐' * int(podcast.get('rating', 4))} *{segment.get('intro', '')}* """ return "πŸŽ™οΈ Podcast Time!" elif seg_type == "story": return f"πŸ“– **Story Time**\n\n{segment.get('content', '')}" return "πŸ“» Radio Segment" def get_segment_text(segment: Dict[str, Any]) -> str: """Extract text for TTS from segment""" seg_type = segment["type"] if seg_type in ["intro", "outro", "story"]: return str(segment.get("content", "")) elif seg_type == "news": # Handle batched news script_batches = segment.get("script_batches", segment.get("script", [])) if isinstance(script_batches, list): if script_batches: return " ".join(str(batch) for batch in script_batches) # Join all batches return "" return str(script_batches) if script_batches else "" # Fallback elif seg_type == "music": return str(segment.get("commentary", "")) elif seg_type == "podcast": return str(segment.get("intro", "")) return "" def split_text_into_segments(text: str, max_sentences: int = 2) -> List[str]: """Split long LLM answers into smaller sentence-based segments for UI display.""" if not text: return [] # Split by sentence endings, keep it simple sentences = re.split(r'(?<=[.!?])\s+', text.strip()) segments: List[str] = [] current: List[str] = [] for s in sentences: s = s.strip() if not s: continue current.append(s) if len(current) >= max_sentences: segments.append(" ".join(current)) current = [] if current: segments.append(" ".join(current)) return segments def handle_voice_request(audio_file): """Handle voice input request for song from uploaded audio file""" if not audio_file: return "⚠️ Please record your voice request first!", None, "", gr.Timer(value=0, active=False) try: # Use speech_recognition to process the audio file import speech_recognition as sr from pydub import AudioSegment recognizer = sr.Recognizer() # Convert audio to WAV format if needed (speech_recognition requires WAV) audio_path = audio_file if isinstance(audio_file, tuple): # Gradio Audio returns (sample_rate, audio_data) or filepath audio_path = audio_file[1] if len(audio_file) > 1 else audio_file[0] # If it's not a WAV file, convert it if audio_path and not audio_path.endswith('.wav'): try: # Load and convert to WAV audio = AudioSegment.from_file(audio_path) wav_path = audio_path.rsplit('.', 1)[0] + '.wav' audio.export(wav_path, format="wav") audio_path = wav_path except Exception as e: print(f"⚠️ Could not convert audio: {e}, trying original file") # Load audio file for recognition try: with sr.AudioFile(audio_path) as source: audio = recognizer.record(source) except Exception as e: # Try with pydub conversion first try: audio_seg = AudioSegment.from_file(audio_path) wav_temp = os.path.join(AUDIO_DIR, f"temp_voice_{int(time.time())}.wav") audio_seg.export(wav_temp, format="wav") with sr.AudioFile(wav_temp) as source: audio = recognizer.record(source) audio_path = wav_temp except Exception as conv_e: return f"❌ Could not process audio file: {conv_e}. Please try recording again.", None, "", gr.Timer(value=0, active=False) # Recognize speech using Google's API try: recognized_text = recognizer.recognize_google(audio) print(f"🎀 Recognized: {recognized_text}") except sr.UnknownValueError: return "❌ Could not understand audio. Please speak clearly and try again.", None, "", gr.Timer(value=0, active=False) except sr.RequestError as e: return f"❌ Error with speech recognition service: {e}. Please try again.", None, "", gr.Timer(value=0, active=False) # Process the request song_request = voice_input_service.process_song_request(recognized_text) print(f"🎀 Processed request: {song_request}") # Search for music tracks = agent.music_server.search_by_request(song_request) if not tracks: return f"❌ Could not find music for: '{recognized_text}'. Try saying something like 'play pop music' or 'play a song by [artist name]'!", None, "", gr.Timer(value=0, active=False) # Get the first matching track track = tracks[0] print(f"🎡 Selected track: {track.get('title', 'Unknown')} by {track.get('artist', 'Unknown')}") # Store current track for like/dislike buttons radio_state["current_track"] = track # Add to user history user_id = radio_state.get("user_id") if user_id: user_memory.add_to_history(user_id, track) # Generate host response using LLM user_prefs = radio_state.get("user_preferences", {}) host_response = agent.generate_song_request_response(recognized_text, track, user_prefs) # Generate TTS for host response audio_file = None print(f"πŸŽ™οΈ Generating TTS for: {host_response[:50]}...") if getattr(tts_service, "available", True): audio_bytes = tts_service.text_to_speech(host_response) if audio_bytes: audio_file = os.path.join(AUDIO_DIR, f"voice_request_{int(time.time())}.mp3") tts_service.save_audio(audio_bytes, audio_file) print(f"βœ… Audio saved to: {audio_file} ({len(audio_bytes)} bytes)") else: print("❌ TTS returned no audio bytes") else: print("❌ TTS service not available") # Create music player HTML with volume fading (0β†’30% during host speech, then 30%β†’100%) music_player_html = "" player_id = f"voice_player_{int(time.time())}" timer_config = gr.Timer(value=0, active=False) # Modal proxy returns "youtube_api" as source, so check both if track.get("source") in ["youtube", "youtube_api"]: youtube_id = track.get("youtube_id", "") if not youtube_id and "v=" in track.get("url", ""): youtube_id = track["url"].split("v=")[-1].split("&")[0] if youtube_id: # Delayed loading mechanism with gr.Timer try: # Get host audio duration to calculate delay delay_s = 0.5 if audio_file and os.path.exists(audio_file): audio = AudioSegment.from_file(audio_file) duration_ms = len(audio) delay_s = max(0.5, (duration_ms / 1000.0) - 3.0) # Start 3 seconds before end print(f"⏱️ Voice request audio: {duration_ms}ms, YouTube delay: {delay_s:.1f}s") # Create the actual player HTML actual_player_html = create_youtube_player_html( youtube_id, track.get('title', 'Unknown'), track.get('artist', 'Unknown'), track['url'] ) # Store for timer callback radio_state["pending_player_html"] = actual_player_html # Create placeholder music_player_html = create_placeholder_html( track.get('title', 'Unknown'), track.get('artist', 'Unknown'), delay_s, "music" ) # Activate timer timer_config = gr.Timer(value=delay_s, active=True) print(f"βœ… Voice request: Player scheduled in {delay_s:.1f}s") except Exception as e: print(f"⚠️ Error calculating delay for voice request: {e}") # Fallback: immediate play music_player_html = create_youtube_player_html( youtube_id, track.get('title', 'Unknown'), track.get('artist', 'Unknown'), track['url'] ) else: music_player_html = f"""

🎡 {track.get('title', 'Unknown')} - {track.get('artist', 'Unknown')}

▢️ Play on YouTube
""" elif track.get("source") == "soundcloud": # SoundCloud embed with simple iframe (no script for volume control) music_player_html = f"""

🎡 {track.get('title', 'Unknown')}

by {track.get('artist', 'Unknown')}

πŸ”— Open on SoundCloud

""" # Note: Gradio sanitizes script tags, so autoplay scripts don't work # The browser may block autoplay - user may need to click play # Return with player return f"βœ… {host_response}\n\n🎡 Playing: {track['title']} by {track['artist']}", audio_file, music_player_html, timer_config except Exception as e: return f"❌ Error processing voice request: {e}", None, "", gr.Timer(value=0, active=False) def get_now_playing(segment: Dict[str, Any]) -> str: """Get now playing text""" seg_type = segment["type"] if seg_type == "music": track = segment.get("track", {}) if track: return f"β™ͺ {track['title']} - {track['artist']}" elif seg_type == "news": return "πŸ“° News Update" elif seg_type == "podcast": podcast = segment.get("podcast", {}) if podcast: return f"πŸŽ™οΈ {podcast['title']}" elif seg_type == "story": return "πŸ“– Story Time" elif seg_type == "intro": return "πŸŽ™οΈ Welcome!" elif seg_type == "outro": return "πŸ‘‹ Goodbye!" return "πŸ“» AI Radio" def init_user(stored_user_id: str = None) -> tuple: """Initialize user from stored ID or create new one Args: stored_user_id: User ID from browser localStorage/cookie Returns: Tuple of (user_id, passphrase, is_new_user) """ user_id, passphrase, is_new = user_memory.get_or_create_user(stored_user_id) radio_state["user_id"] = user_id # Load user preferences if they exist saved_prefs = user_memory.get_user_preferences(user_id) if saved_prefs: radio_state["user_preferences"] = saved_prefs return user_id, passphrase, is_new def authenticate_with_passphrase(passphrase: str) -> tuple: """Authenticate user with passphrase Args: passphrase: User's passphrase (e.g. "swift-tiger-aurora") Returns: Tuple of (success_message, user_id, passphrase) """ if not passphrase or not passphrase.strip(): return "⚠️ Please enter your passphrase", "", "" passphrase = passphrase.strip().lower() user_id = user_memory.get_user_by_passphrase(passphrase) if user_id: radio_state["user_id"] = user_id # Load user preferences saved_prefs = user_memory.get_user_preferences(user_id) if saved_prefs: radio_state["user_preferences"] = saved_prefs stats = user_memory.get_user_stats(user_id) return f"βœ… Welcome back! Logged in successfully.\n\nπŸ“Š Your stats:\nβ€’ Liked tracks: {stats.get('total_likes', 0)}\nβ€’ Play history: {stats.get('total_plays', 0)} tracks", user_id, passphrase else: return "❌ Invalid passphrase. Please check and try again.", "", "" def create_new_account() -> tuple: """Create a new user account Returns: Tuple of (message, user_id, passphrase) """ user_id, passphrase, _ = user_memory.get_or_create_user() radio_state["user_id"] = user_id message = f"""πŸŽ‰ Welcome to AI Radio! Your personal passphrase is: πŸ”‘ **{passphrase}** ⚠️ **Save this passphrase!** You'll need it to access your account from other devices or if cookies are cleared.""" return message, user_id, passphrase def like_current_track() -> str: """Like the currently playing track""" user_id = radio_state.get("user_id") track = radio_state.get("current_track") if not user_id: return "⚠️ User not initialized" if not track: return "⚠️ No track currently playing" if user_memory.like_track(user_id, track): return f"πŸ‘ Liked: {track.get('title', 'Unknown')} by {track.get('artist', 'Unknown')}" return "❌ Could not save like" def dislike_current_track() -> str: """Dislike the currently playing track""" user_id = radio_state.get("user_id") track = radio_state.get("current_track") if not user_id: return "⚠️ User not initialized" if not track: return "⚠️ No track currently playing" if user_memory.dislike_track(user_id, track): return f"πŸ‘Ž Won't play again: {track.get('title', 'Unknown')}" return "❌ Could not save dislike" def get_liked_tracks_display() -> str: """Get formatted list of liked tracks for display""" user_id = radio_state.get("user_id") if not user_id: return "No liked tracks yet. Start listening and like some songs!" liked = user_memory.get_liked_tracks(user_id) if not liked: return "No liked tracks yet. Like some songs while listening!" lines = ["### ❀️ Your Liked Tracks\n"] for i, track in enumerate(liked[-20:], 1): # Show last 20 lines.append(f"{i}. **{track.get('title', 'Unknown')}** - {track.get('artist', 'Unknown')}") if len(liked) > 20: lines.append(f"\n*...and {len(liked) - 20} more liked tracks*") return "\n".join(lines) def get_user_stats_display() -> str: """Get user stats for display""" user_id = radio_state.get("user_id") if not user_id: return "Start listening to see your stats!" stats = user_memory.get_user_stats(user_id) liked_count = len(user_memory.get_liked_tracks(user_id)) return f"""### πŸ“Š Your Stats 🎡 **Total Tracks Played:** {stats.get('total_plays', 0)} πŸ‘ **Liked Tracks:** {liked_count} πŸ‘Ž **Disliked Tracks:** {stats.get('total_dislikes', 0)} *Your liked tracks have a 30% chance of playing during music segments!* """ def get_stats(): """Get listening statistics for the current user""" user_id = radio_state.get("user_id") if not user_id: return """πŸ“Š **Your Listening Stats** *Log in or create an account to track your listening history!* """ # Get user-specific stats from user_memory user_stats = user_memory.get_user_stats(user_id) liked_tracks = user_memory.get_liked_tracks(user_id) disliked_tracks = user_memory.get_disliked_tracks(user_id) play_history = user_memory.get_play_history(user_id) # Count by type from play history music_count = sum(1 for t in play_history if t.get('source') in ['youtube', 'soundcloud']) return f"""πŸ“Š **Your Listening Stats** πŸ‘€ **User ID:** {user_id} 🎡 **Total Tracks Played:** {user_stats.get('total_plays', 0)} πŸ‘ **Liked Tracks:** {len(liked_tracks)} πŸ‘Ž **Disliked Tracks:** {len(disliked_tracks)} πŸ“œ **Recent History:** {len(play_history)} tracks *Your liked tracks have a 30% chance of playing during music segments!* *Disliked tracks will be skipped automatically.* """ # Custom CSS for beautiful radio station UI custom_css = """ #radio-header { text-align: center; background: linear-gradient(135deg, #667eea 0%, #764ba2 100%); color: white; padding: 2rem; border-radius: 10px; margin-bottom: 1rem; } #now-playing { background: linear-gradient(135deg, #f093fb 0%, #f5576c 100%); color: white; padding: 1.5rem; border-radius: 10px; text-align: center; font-size: 1.2em; font-weight: bold; margin: 1rem 0; } #segment-display { background: #f8f9fa; padding: 2rem; border-radius: 10px; min-height: 200px; border-left: 4px solid #667eea; } .control-button { font-size: 1.1em !important; padding: 0.8rem !important; } #stats-panel { background: linear-gradient(135deg, #84fab0 0%, #8fd3f4 100%); padding: 1.5rem; border-radius: 10px; color: #333; } """ # Auto-play JavaScript (injected in head) auto_play_head = """ """ # Build Gradio Interface with gr.Blocks(css=custom_css, title="AI Radio 🎡", theme=gr.themes.Soft(), head=auto_play_head) as demo: # Hidden state for user ID (persisted via localStorage) user_id_state = gr.State(value=None) # Header gr.HTML("""

🎡 AI Radio Station 🎡

Your Personal AI-Powered Radio Experience

Personalized Music β€’ News β€’ Podcasts β€’ Stories

""") with gr.Tabs() as tabs: # Tab 0: Account/Auth with gr.Tab("πŸ” Account", id=0): gr.Markdown("### 🎧 Welcome to AI Radio") # Hidden state for auth auth_user_id = gr.Textbox(visible=False, elem_id="auth_user_id") auth_passphrase = gr.Textbox(visible=False, elem_id="auth_passphrase") with gr.Group(): auth_status = gr.Markdown( value="*Loading your account...*", elem_id="auth_status" ) # Current passphrase display (shown after login/signup) passphrase_display = gr.Textbox( label="πŸ”‘ Your Passphrase (save this!)", value="", interactive=False, visible=True, elem_id="passphrase_display" ) gr.Markdown("---") gr.Markdown("### πŸ”“ Login with Passphrase") gr.Markdown("*Enter your passphrase to access your saved preferences, liked tracks, and history from any device.*") with gr.Row(): passphrase_input = gr.Textbox( label="Enter Passphrase", placeholder="e.g., swift-tiger-aurora", scale=3 ) login_btn = gr.Button("πŸ”‘ Login", variant="primary", scale=1) login_status = gr.Textbox(label="Login Status", interactive=False, visible=True) gr.Markdown("---") gr.Markdown("### πŸ†• New User?") gr.Markdown("*Click below to create a new account. You'll receive a unique passphrase to remember.*") create_account_btn = gr.Button("✨ Create New Account", variant="secondary") gr.Markdown("---") gr.Markdown(""" **πŸ’‘ How it works:** - If you are here for the first time, create a new account - Use the same passphrase on any device to access your account - Just enter your passphrase to log back in """) # Tab 1: Preferences with gr.Tab("βš™οΈ Your Preferences", id=1): gr.Markdown("### 🎯 Personalize Your Radio Experience") gr.Markdown("Tell us about your preferences so we can create the perfect radio show for you!") with gr.Row(): with gr.Column(): name_input = gr.Textbox( label="πŸ‘€ Your Name", placeholder="Enter your name", value="Friend" ) mood_input = gr.Dropdown( label="😊 Current Mood", choices=["happy", "energetic", "calm", "focused", "relaxed"], value="happy" ) # Add voice selection dropdown from tts_service import VOICE_OPTIONS voice_input = gr.Dropdown( label="🎀 Host Voice", choices=list(VOICE_OPTIONS.keys()), value="Rachel (Female)", info="Choose the voice for your radio host" ) with gr.Column(): genres_input = gr.Dropdown( label="🎡 Favorite Music Genres", choices=["pop", "rock", "jazz", "classical", "electronic", "country", "indie", "rap", "blues", "folk", "hip-hop"], multiselect=True, value=["rap"] ) interests_input = gr.Dropdown( label="πŸ“° News Interests", choices=["technology", "world", "business", "entertainment", "science", "sports"], multiselect=True, value=["technology", "world"] ) podcast_input = gr.Dropdown( label="πŸŽ™οΈ Podcast Interests", choices=["technology", "business", "comedy", "education", "news", "true-crime"], multiselect=True, value=["technology"] ) gr.Markdown("### πŸŽ›οΈ Content Filter") gr.Markdown("Choose what types of content you want to hear:") with gr.Row(): music_filter = gr.Checkbox(label="🎡 Music", value=True) news_filter = gr.Checkbox(label="πŸ“° News", value=True) podcast_filter = gr.Checkbox(label="πŸŽ™οΈ Podcasts", value=True) story_filter = gr.Checkbox(label="πŸ“– AI Stories", value=True) save_pref_btn = gr.Button("πŸ’Ύ Save & Go to Radio ▢️", variant="primary", size="lg") pref_status = gr.Textbox(label="Status", interactive=False) # Tab 2: Radio Player with gr.Tab("πŸ“» Radio Player", id=2): gr.Markdown("### 🎧 Start your personalized radio experience!") now_playing = gr.HTML("
πŸ“» Ready to start
") with gr.Row(): start_btn = gr.Button("▢️ Generate & Play", variant="primary", size="lg", elem_classes="control-button") next_btn = gr.Button("⏭️ Next Segment", variant="secondary", size="lg", elem_classes="control-button") stop_btn = gr.Button("⏹️ Stop", variant="stop", size="lg", elem_classes="control-button") with gr.Row(): voice_audio = gr.Audio( label="🎀 Record Your Song Request", type="filepath", sources=["microphone"], format="wav" ) voice_btn = gr.Button("🎀 Ask For Song", variant="primary", size="lg", elem_classes="control-button") voice_status = gr.Textbox(label="Voice Request Status", value="Record your voice request above, then click the button", interactive=False) # Like/Dislike buttons for current track with gr.Row(): like_btn = gr.Button("πŸ‘ Like This Song", variant="secondary", size="sm") dislike_btn = gr.Button("πŸ‘Ž Don't Play Again", variant="secondary", size="sm") like_status = gr.Textbox(label="Rating Status", value="Rate the current song!", interactive=False, scale=2) progress_text = gr.Textbox(label="Progress", value="Ready to start", interactive=False, visible=False) segment_info = gr.Markdown("**Welcome!** Set your preferences and start the radio.", elem_id="segment-display") llm_script = gr.Textbox( label="🧠 Model Script (LLM answer)", value="Model-generated script will appear here for each segment.", interactive=False, lines=6, visible=False ) gr.Markdown("**πŸ’‘ Tip:** Host speech plays first, then music/podcasts will stream automatically!") audio_output = gr.Audio(label="πŸ”Š Host Speech", autoplay=True, type="filepath", elem_id="host_audio") music_player = gr.HTML(label="🎡 Music/Podcast Player (streaming)") # Timer for delayed player loading (start disabled) player_timer = gr.Timer(value=0, active=False) player_timer.tick( fn=show_pending_player, inputs=[], outputs=[music_player, player_timer] ) status_text = gr.Textbox(label="Status", value="Ready", interactive=False, visible=False) # Connect buttons start_btn.click( fn=start_and_play_first_segment, inputs=[], outputs=[status_text, segment_info, audio_output, music_player, progress_text, now_playing, llm_script, player_timer] ) next_btn.click( fn=play_next_segment, inputs=[], outputs=[segment_info, audio_output, music_player, progress_text, now_playing, llm_script, player_timer] ) stop_btn.click( fn=stop_radio, inputs=[], outputs=[status_text, audio_output, music_player, progress_text, now_playing, player_timer], js="() => { if(window.cancelNextSegment) window.cancelNextSegment(); }" ) # Voice input button - process recorded audio voice_btn.click( fn=handle_voice_request, inputs=[voice_audio], outputs=[voice_status, audio_output, music_player, player_timer] ) # Like/Dislike buttons like_btn.click( fn=like_current_track, inputs=[], outputs=[like_status] ) dislike_btn.click( fn=dislike_current_track, inputs=[], outputs=[like_status] ) # Connect save preferences button (needs to be after both tabs are defined) save_pref_btn.click( fn=save_preferences, inputs=[name_input, genres_input, interests_input, podcast_input, mood_input, music_filter, news_filter, podcast_filter, story_filter, voice_input], outputs=[pref_status] ).then( fn=lambda: gr.Tabs(selected=2), # Go to Radio Player tab (id=2) outputs=[tabs] ) # Auth button handlers def handle_login(passphrase): """Handle login with passphrase - returns all data including preferences""" message, user_id, pp = authenticate_with_passphrase(passphrase) if user_id: # Get preferences to autofill prefs = get_saved_preferences() # Returns: message, user_id, passphrase, passphrase_display, auth_status_md, # name, genres, interests, podcasts, mood, music, news, podcast, story, voice return (message, user_id, pp, pp, f"βœ… Logged in! Passphrase: **{pp}**", prefs[0], prefs[1], prefs[2], prefs[3], prefs[4], prefs[5], prefs[6], prefs[7], prefs[8], prefs[9]) # Failed login - return empty/default values return (message, "", "", "", "*Login failed*", "Friend", ["pop", "rock"], ["technology", "world"], ["technology"], "happy", True, True, True, True, "Rachel (Female)") def handle_create_account(): """Handle new account creation""" message, user_id, passphrase = create_new_account() # Return with passphrase display return message, user_id, passphrase, passphrase def handle_init_from_cookie(stored_user_id): """Initialize user from cookie-stored user ID - returns preferences too""" if stored_user_id and stored_user_id.strip(): user_id, passphrase, is_new = init_user(stored_user_id) prefs = get_saved_preferences() if is_new: message = f"""πŸŽ‰ Welcome to AI Radio! Your personal passphrase is: πŸ”‘ **{passphrase}** ⚠️ **Save this passphrase!** You'll need it to access your account from other devices.""" else: stats = user_memory.get_user_stats(user_id) message = f"""βœ… Welcome back! Your passphrase: **{passphrase}** πŸ“Š Your stats: β€’ Liked tracks: {stats.get('total_likes', 0)} β€’ Play history: {stats.get('total_plays', 0)} tracks""" # Return: message, user_id, passphrase, passphrase_display, # name, genres, interests, podcasts, mood, music, news, podcast, story, voice return (message, user_id, passphrase, passphrase, prefs[0], prefs[1], prefs[2], prefs[3], prefs[4], prefs[5], prefs[6], prefs[7], prefs[8], prefs[9]) else: # No cookie - return defaults return ("*No saved account found. Create a new account or log in with your passphrase.*", "", "", "", "Friend", ["pop", "rock"], ["technology", "world"], ["technology"], "happy", True, True, True, True, "Rachel (Female)") # Login button - single handler that returns everything login_btn.click( fn=handle_login, inputs=[passphrase_input], outputs=[login_status, auth_user_id, auth_passphrase, passphrase_display, auth_status, name_input, genres_input, interests_input, podcast_input, mood_input, music_filter, news_filter, podcast_filter, story_filter, voice_input] ).then( # Save to localStorage via JS fn=None, inputs=[auth_user_id, auth_passphrase], outputs=[], js="(userId, passphrase) => { if(userId && passphrase) { localStorage.setItem('ai_radio_user_id', userId); localStorage.setItem('ai_radio_passphrase', passphrase); } }" ).then( # Go to Radio Player tab after successful login fn=lambda uid: gr.Tabs(selected=2) if uid else gr.Tabs(selected=0), inputs=[auth_user_id], outputs=[tabs] ) # Create account button create_account_btn.click( fn=handle_create_account, inputs=[], outputs=[auth_status, auth_user_id, auth_passphrase, passphrase_display] ).then( # Save to localStorage via JS fn=None, inputs=[auth_user_id, auth_passphrase], outputs=[], js="(userId, passphrase) => { localStorage.setItem('ai_radio_user_id', userId); localStorage.setItem('ai_radio_passphrase', passphrase); }" ).then( # Go to preferences tab fn=lambda: gr.Tabs(selected=1), outputs=[tabs] ) # Initialize from localStorage on page load demo.load( fn=None, inputs=[], outputs=[auth_user_id], js="() => { return localStorage.getItem('ai_radio_user_id') || ''; }" ).then( fn=handle_init_from_cookie, inputs=[auth_user_id], outputs=[auth_status, auth_user_id, auth_passphrase, passphrase_display, name_input, genres_input, interests_input, podcast_input, mood_input, music_filter, news_filter, podcast_filter, story_filter, voice_input] ).then( # Save to localStorage (in case new user was created) fn=None, inputs=[auth_user_id, auth_passphrase], outputs=[], js="(userId, passphrase) => { if(userId && passphrase) { localStorage.setItem('ai_radio_user_id', userId); localStorage.setItem('ai_radio_passphrase', passphrase); } }" ) # Tab 3: Liked Tracks with gr.Tab("❀️ Liked Tracks", id=3): gr.Markdown("### ❀️ Your Liked Tracks") gr.Markdown("Songs you've liked will be played again sometimes! (30% chance during music segments)") liked_display = gr.Markdown("No liked tracks yet. Like some songs while listening!") user_stats_display = gr.Markdown("") refresh_liked_btn = gr.Button("πŸ”„ Refresh Liked Tracks", variant="secondary") refresh_liked_btn.click( fn=get_liked_tracks_display, inputs=[], outputs=[liked_display] ).then( fn=get_user_stats_display, inputs=[], outputs=[user_stats_display] ) # Tab 4: Statistics with gr.Tab("πŸ“Š Your Stats", id=4): gr.Markdown("### πŸ“ˆ Your Listening Statistics") stats_display = gr.Markdown(elem_id="stats-panel") refresh_stats_btn = gr.Button("πŸ”„ Refresh Stats", variant="secondary") refresh_stats_btn.click( fn=get_stats, inputs=[], outputs=[stats_display] ) # Load stats on tab open demo.load( fn=get_stats, inputs=[], outputs=[stats_display] ) # Tab 5: About with gr.Tab("ℹ️ About", id=5): gr.Markdown(""" # 🎡 AI Radio - Your Personal Radio Station ## About This App AI Radio is an intelligent, personalized radio station powered by cutting-edge AI technology. It creates a unique listening experience tailored to your preferences, mood, and interests. ## 🌟 Features - **🎡 Personalized Music**: Curated tracks from YouTube based on your favorite genres and mood - **πŸ“° Custom News**: Real-time news updates on topics you care about - **πŸŽ™οΈ Podcast Recommendations**: Discover interesting podcasts matching your interests - **πŸ“– AI-Generated Stories**: Entertaining stories and fun facts - **πŸ€– AI Host**: Dynamic AI radio host that introduces segments (choose your preferred voice!) - **πŸ’Ύ Smart Recommendations**: RAG system learns from your listening history - **πŸ” User Accounts**: Passphrase-based authentication with saved preferences ## 🧠 How It Works ### MCP Servers (Modular Tools) Three specialized **MCP (Model Context Protocol)** servers act as tools: - **Music Server**: Searches YouTube for music tracks matching your preferences - **News Server**: Fetches real-time news from RSS feeds - **Podcast Server**: Discovers podcasts on YouTube ### LLM (Large Language Model) **Nebius GPT-OSS-120B** generates all text content: - Personalized host commentary and introductions - Conversational news scripts - Entertaining stories and fun facts - All adapted to your mood and preferences ### RAG System (Retrieval-Augmented Generation) **LlamaIndex-powered RAG** provides context-aware personalization: - Stores your preferences and listening history - Retrieves context for better recommendations - Learns from your behavior to improve suggestions over time ## πŸ› οΈ Technology Stack - **Gradio**: Beautiful, interactive UI - **Nebius GPT-OSS-120B** (OpenAI-compatible): LLM for content generation - **ElevenLabs**: High-quality text-to-speech for voice generation - **LlamaIndex**: RAG system for personalized recommendations - **MCP Servers**: Modular tools for music, news, and podcasts - **yt-dlp**: YouTube music and podcast search - **YouTube/SoundCloud**: Music and podcast streaming ## πŸ† Built for MCP 1st Birthday Competition This app demonstrates: - βœ… **Autonomous Agent Behavior**: Planning, reasoning, and execution - βœ… **MCP Servers as Tools**: Modular music, news, and podcast servers - βœ… **RAG System**: Context-aware personalization with LlamaIndex - βœ… **LLM Integration**: Content generation with Nebius GPT-OSS-120B - βœ… **Gradio Interface**: Seamless user experience ## πŸ“ How to Use 1. **Create Account**: Get your unique passphrase (saved in browser) 2. **Set Preferences**: Choose genres, interests, and mood 3. **Start Radio**: Click "Generate & Play" to begin your personalized show 4. **Interact**: Like/dislike tracks, request songs by voice 5. **Track Stats**: View your listening history and statistics --- Made with ❀️ for the MCP 1st Birthday Competition """) # Footer gr.HTML("""

🎡 AI Radio - Personalized Radio for Everyone 🎡

Track: MCP in Action - Consumer Applications

""") # Launch the app # Note: For HuggingFace Spaces, use app.py at root level instead if __name__ == "__main__": import os port = int(os.getenv("PORT", 7871)) demo.launch( server_name="0.0.0.0", server_port=port, share=False )