AI-RADIO / src /app.py
Nikita Makarov
Add debug logging for podcast iframe creation
131e39a
"""AI Radio - Personalized Radio Station with AI Host"""
import gradio as gr
import os
import json
import time
import re
from typing import Dict, Any, List, Optional
import threading
from pydub import AudioSegment
# Get project root directory (parent of src/)
PROJECT_ROOT = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
AUDIO_DIR = os.path.join(PROJECT_ROOT, "audio")
LOG_DIR = os.path.join(PROJECT_ROOT, "logs")
os.makedirs(AUDIO_DIR, exist_ok=True)
os.makedirs(LOG_DIR, exist_ok=True)
from config import get_config
from radio_agent import RadioAgent
from tts_service import TTSService
from rag_system import RadioRAGSystem
from voice_input import VoiceInputService
from user_memory import UserMemoryService
def create_youtube_player_html(youtube_id: str, title: str, artist: str, url: str) -> str:
"""Create YouTube player iframe HTML"""
return f"""
<iframe width="100%" height="500"
src="https://www.youtube.com/embed/{youtube_id}?autoplay=1"
allow="accelerometer; autoplay; encrypted-media; gyroscope; picture-in-picture"
allowfullscreen>
</iframe>
"""
def create_podcast_player_html(youtube_id: str, title: str, host: str, url: str) -> str:
"""Create podcast YouTube player iframe HTML"""
return f"""
<iframe width="100%" height="500"
src="https://www.youtube.com/embed/{youtube_id}?autoplay=1"
allow="accelerometer; autoplay; encrypted-media; gyroscope; picture-in-picture"
allowfullscreen>
</iframe>
"""
def create_placeholder_html(title: str, artist: str, delay_s: float, media_type: str = "music") -> str:
"""Create placeholder HTML shown while waiting for host speech to end"""
bg_color = "#1a1a2e" if media_type == "music" else "#6b46c1"
icon = "🎡" if media_type == "music" else "πŸŽ™οΈ"
return f"""
<div style="padding: 1rem; background: linear-gradient(135deg, {bg_color} 0%, #16213e 100%); border-radius: 12px; margin: 1rem 0; box-shadow: 0 4px 15px rgba(0,0,0,0.3);">
<h4 style="margin: 0 0 0.75rem 0; color: #fff; font-size: 1.1em;">{icon} {title}</h4>
<p style="margin: 0 0 0.75rem 0; color: #aaa; font-size: 0.9em;">by {artist}</p>
<div style="position: relative; width: 100%; padding-bottom: 56.25%; border-radius: 8px; overflow: hidden; background: #000; display: flex; align-items: center; justify-content: center;">
<div style="position: absolute; top: 50%; left: 50%; transform: translate(-50%, -50%); text-align: center; color: #fff;">
<div style="font-size: 3em; margin-bottom: 0.5rem;">⏳</div>
<p style="margin: 0; font-size: 0.9em;">Video starts in {delay_s:.0f}s...</p>
<p style="margin: 0.5rem 0 0 0; font-size: 0.8em; color: #888;">After host finishes speaking</p>
</div>
</div>
</div>
"""
def get_pending_player():
"""Check if there's a pending player ready to show"""
pending = radio_state.get("pending_player")
if pending and time.time() >= pending.get("ready_at", 0):
html = pending.get("html", "")
radio_state["pending_player"] = None # Clear it
return html
return None
# Global state
radio_state = {
"is_playing": False,
"is_stopped": False, # User explicitly stopped (vs just paused)
"current_segment_index": 0,
"planned_show": [],
"user_preferences": {},
"stop_flag": False,
"current_news_batches": [],
"news_total_batches": 0,
"news_batches_played": 0,
"last_segment": None,
"current_track": None, # Currently playing track for like/dislike
"user_id": None, # Current user ID
"auto_play": True, # Auto-play next segment when current finishes
"content_filter": {
"music": True,
"news": True,
"podcasts": True,
"stories": True
},
# Pending media player (to be added after host speech ends)
"pending_player": None, # {"html": "...", "ready_at": timestamp}
}
# Initialize services
config = get_config()
agent = RadioAgent(config)
# Use open-source local TTS instead of ElevenLabs
tts_service = TTSService(api_key=getattr(config, "elevenlabs_api_key", ""), voice_id=getattr(config, "elevenlabs_voice_id", ""))
voice_input_service = VoiceInputService()
user_memory = UserMemoryService()
def save_preferences(name: str, favorite_genres: List[str], interests: List[str],
podcast_interests: List[str], mood: str,
music_filter: bool, news_filter: bool, podcast_filter: bool, story_filter: bool,
voice_name: str = None) -> str:
"""Save user preferences to RAG system"""
# Initialize user if not already done
if not radio_state.get("user_id"):
user_id, _, _ = user_memory.get_or_create_user()
radio_state["user_id"] = user_id
print(f"πŸ‘€ User initialized: {radio_state['user_id']}")
# Map voice name to voice_id if needed
from tts_service import VOICE_OPTIONS
selected_voice_id = "21m00Tcm4TlvDq8ikWAM" # Default Rachel
if voice_name and voice_name in VOICE_OPTIONS:
selected_voice_id = VOICE_OPTIONS[voice_name]
preferences = {
"name": name or "Friend",
"favorite_genres": favorite_genres or ["pop"],
"interests": interests or ["world"],
"podcast_interests": podcast_interests or ["technology"],
"mood": mood or "happy",
"voice_id": selected_voice_id,
"voice_name": voice_name or "Rachel (Female)",
"content_filter": {
"music": music_filter,
"news": news_filter,
"podcasts": podcast_filter,
"stories": story_filter
},
"timestamp": time.time()
}
# Update content filter
radio_state["content_filter"] = preferences["content_filter"]
radio_state["user_preferences"] = preferences
# Update TTS service with new voice
global tts_service
tts_service.voice_id = selected_voice_id
print(f"🎀 Voice updated to: {voice_name} (ID: {selected_voice_id})")
# Save to RAG system with user_id
user_id = radio_state.get("user_id")
agent.rag_system.store_user_preferences(preferences, user_id=user_id)
# Save to user memory for persistence
if user_id:
user_memory.save_user_preferences(user_id, preferences)
print(f"πŸ’Ύ Preferences saved for user {user_id}")
return f"βœ… Preferences saved! Welcome, {preferences['name']}! Your personalized radio is ready."
def get_saved_preferences() -> tuple:
"""Get saved preferences for the current user to autofill the form
Returns:
Tuple of (name, genres, interests, podcast_interests, mood, music, news, podcast, story, voice_name)
"""
user_id = radio_state.get("user_id")
if not user_id:
# Return defaults
return "Friend", ["pop", "rock"], ["technology", "world"], ["technology"], "happy", True, True, True, True, "Rachel (Female)"
prefs = user_memory.get_user_preferences(user_id)
if not prefs:
return "Friend", ["pop", "rock"], ["technology", "world"], ["technology"], "happy", True, True, True, True, "Rachel (Female)"
# Extract content filter
content_filter = prefs.get("content_filter", {})
# Get voice name (prefer stored voice_name, otherwise map from voice_id)
voice_name = prefs.get("voice_name", "Rachel (Female)")
if not voice_name or voice_name not in ["Rachel (Female)", "Lera (Female)", "Bella (Female)", "Antoni (Male)", "Arnold (Male)", "Adam (Male)", "Domi (Female)", "Elli (Female)", "Josh (Male)", "Sam (Male)"]:
# Try to map voice_id back to name
from tts_service import VOICE_OPTIONS
voice_id = prefs.get("voice_id", "21m00Tcm4TlvDq8ikWAM")
for name, vid in VOICE_OPTIONS.items():
if vid == voice_id:
voice_name = name
break
else:
voice_name = "Rachel (Female)" # Default fallback
return (
prefs.get("name", "Friend"),
prefs.get("favorite_genres", ["pop", "rock"]),
prefs.get("interests", ["technology", "world"]),
prefs.get("podcast_interests", ["technology"]),
prefs.get("mood", "happy"),
content_filter.get("music", True),
content_filter.get("news", True),
content_filter.get("podcasts", True),
content_filter.get("stories", True),
voice_name
)
def start_radio_stream():
"""Start the radio stream"""
# Use saved preferences, or set defaults if none exist
if not radio_state["user_preferences"]:
# Try to load from user memory
user_id = radio_state.get("user_id")
if user_id:
saved = user_memory.get_user_preferences(user_id)
if saved:
radio_state["user_preferences"] = saved
radio_state["content_filter"] = saved.get("content_filter", {
"music": True, "news": True, "podcasts": True, "stories": True
})
# Still no preferences? Use defaults
if not radio_state["user_preferences"]:
radio_state["user_preferences"] = {
"name": "Friend",
"favorite_genres": ["pop", "rock"],
"interests": ["technology", "world"],
"podcast_interests": ["technology"],
"mood": "happy",
"voice_id": "21m00Tcm4TlvDq8ikWAM",
"voice_name": "Rachel (Female)"
}
radio_state["content_filter"] = {
"music": True, "news": True, "podcasts": True, "stories": True
}
if radio_state["is_playing"]:
return "πŸ“» Radio is already playing!", None, None, None, ""
# Plan the show with content filter
user_id = radio_state.get("user_id")
show_plan = agent.plan_radio_show(
user_id=user_id,
user_preferences=radio_state["user_preferences"],
duration_minutes=30,
content_filter=radio_state["content_filter"]
)
radio_state["planned_show"] = show_plan
radio_state["current_segment_index"] = 0
radio_state["is_playing"] = True
radio_state["stop_flag"] = False
radio_state["current_news_batches"] = []
radio_state["news_total_batches"] = 0
radio_state["news_batches_played"] = 0
radio_state["last_segment"] = None
return "🎡 Starting your personalized radio show...", None, None, None, ""
def show_pending_player():
"""Called by timer to finally show the media player"""
player_html = radio_state.get("pending_player_html", "")
print("⏰ Timer fired: Showing pending media player now")
# Return player HTML and disable timer
return player_html, gr.Timer(value=0, active=False)
def start_and_play_first_segment():
"""
One-shot helper for the UI:
- If resuming from stop: continue from next segment
- If fresh start: Plan the show and play first segment
Returns everything needed to update the UI in a single call.
"""
print("▢️ [start_and_play_first_segment] Starting...")
# Initialize user if not already done
if not radio_state.get("user_id"):
user_id, _, _ = user_memory.get_or_create_user()
radio_state["user_id"] = user_id
print(f"πŸ‘€ User initialized: {radio_state['user_id']}")
# Use saved preferences, or set defaults if none exist
if not radio_state["user_preferences"]:
# Try to load from user memory
user_id = radio_state.get("user_id")
if user_id:
saved = user_memory.get_user_preferences(user_id)
if saved:
radio_state["user_preferences"] = saved
radio_state["content_filter"] = saved.get("content_filter", {
"music": True, "news": True, "podcasts": True, "stories": True
})
# Update TTS service with saved voice
voice_id = saved.get("voice_id", "21m00Tcm4TlvDq8ikWAM")
tts_service.voice_id = voice_id
print(f"πŸ“‚ Loaded saved preferences for user {user_id}")
print(f"🎀 Voice set to: {saved.get('voice_name', 'Rachel (Female)')} (ID: {voice_id})")
# Still no preferences? Use defaults
if not radio_state["user_preferences"]:
radio_state["user_preferences"] = {
"name": "Friend",
"favorite_genres": ["pop", "rock"],
"interests": ["technology", "world"],
"podcast_interests": ["technology"],
"mood": "happy",
"voice_id": "21m00Tcm4TlvDq8ikWAM",
"voice_name": "Rachel (Female)"
}
radio_state["content_filter"] = {
"music": True, "news": True, "podcasts": True, "stories": True
}
# Update TTS service with default voice
tts_service.voice_id = "21m00Tcm4TlvDq8ikWAM"
print("πŸ“‹ Using default preferences")
# Check if resuming from stopped state (only if show was manually stopped, not completed)
if radio_state.get("is_stopped") and radio_state.get("planned_show") and radio_state["current_segment_index"] < len(radio_state["planned_show"]):
print("▢️ Resuming from stopped state...")
radio_state["is_playing"] = True
radio_state["is_stopped"] = False
radio_state["stop_flag"] = False
# Play next segment (continuing from where we stopped)
segment_info, host_audio, music_html, progress, now_playing, llm_script, timer_config = play_next_segment()
return (
"▢️ Resuming radio...",
segment_info,
host_audio,
music_html,
progress,
now_playing,
llm_script,
timer_config
)
# If show completed or no valid show to resume, start fresh
# Also check if is_playing is True but show is actually completed
if radio_state.get("is_playing") and radio_state.get("planned_show"):
# Check if show is actually completed
if radio_state["current_segment_index"] >= len(radio_state["planned_show"]):
# Show completed, reset and start fresh
print("πŸ”„ Show completed, resetting for new show...")
reset_radio()
else:
# Show is actually playing
return (
"πŸ“» Radio is already playing!",
"**Welcome!** Set your preferences and start the radio.",
None, None, "Ready to start", "πŸ“» AI Radio",
"Model-generated script will appear here for each segment.",
gr.Timer(value=0, active=False)
)
elif radio_state.get("is_playing"):
# is_playing is True but no planned show - reset
print("πŸ”„ is_playing=True but no planned_show, resetting...")
reset_radio()
import time
t0 = time.time()
# Fresh start - reset and plan new show
reset_radio()
# Step 1: Plan skeleton show (YouTube searches happen here)
print(" [1/3] Planning show (searching YouTube)...")
user_id = radio_state.get("user_id")
show_plan = agent.plan_radio_show(
user_id=user_id,
user_preferences=radio_state["user_preferences"],
duration_minutes=30,
content_filter=radio_state["content_filter"]
)
print(f" [1/3] Done in {time.time()-t0:.1f}s, segments: {len(show_plan)}")
radio_state["planned_show"] = show_plan
radio_state["current_segment_index"] = 0
radio_state["is_playing"] = True
radio_state["stop_flag"] = False
radio_state["is_stopped"] = False
# Step 2 & 3: Generate and play first segment (LLM + TTS inside play_next_segment)
print(" [2/3] Generating first segment (LLM)...")
t1 = time.time()
segment_info, host_audio, music_html, progress, now_playing, llm_script, timer_config = play_next_segment()
print(f" [2/3] Done in {time.time()-t1:.1f}s")
print(f" [3/3] Total time: {time.time()-t0:.1f}s")
print(f" Host audio file: {host_audio}")
return (
"🎡 Starting your personalized radio show...",
segment_info,
host_audio,
music_html,
progress,
now_playing,
llm_script,
timer_config
)
def play_next_segment():
"""Play the next segment in the show - returns host audio and music audio separately"""
# If stopped, resume first
if radio_state.get("is_stopped"):
print("▢️ Resuming from stopped state in play_next_segment...")
radio_state["is_stopped"] = False
radio_state["stop_flag"] = False
radio_state["is_playing"] = True
if not radio_state["is_playing"]:
# If we have a planned show, try to resume
if radio_state.get("planned_show"):
radio_state["is_playing"] = True
radio_state["stop_flag"] = False
else:
return "⏸️ No show planned. Click 'Generate & Play' to start.", None, None, None, "", "", gr.Timer(value=0, active=False)
if radio_state["stop_flag"]:
radio_state["is_playing"] = False
return "⏸️ Radio paused", None, None, None, "", "", gr.Timer(value=0, active=False)
# If there are remaining news TTS batches, play the next one without advancing to a new segment
if radio_state.get("current_news_batches"):
segment = radio_state.get("last_segment")
if segment:
batch_index = radio_state.get("news_batches_played", 1) + 1
next_batch = radio_state["current_news_batches"].pop(0)
radio_state["news_batches_played"] = batch_index
host_audio_file = None
if getattr(tts_service, "available", True):
audio_bytes = tts_service.text_to_speech(next_batch)
if audio_bytes:
host_audio_file = os.path.join(AUDIO_DIR, f"segment_{radio_state['current_segment_index']}_batch_{batch_index}.mp3")
tts_service.save_audio(audio_bytes, host_audio_file)
# Build display info (same news segment)
segment_info = format_segment_info(segment)
raw_script = get_segment_text(segment)
script_batches = segment.get("script_batches", segment.get("script", []))
if isinstance(script_batches, list) and script_batches:
display_script = "\n\n---\n\n".join(str(b) for b in script_batches)
else:
display_script = str(raw_script) if raw_script else ""
total = radio_state.get("news_total_batches", 1) or 1
progress = f"News batch {batch_index}/{total}"
# When no more batches, clear counters so next click advances to next segment
if not radio_state["current_news_batches"]:
radio_state["news_total_batches"] = 0
radio_state["news_batches_played"] = 0
return segment_info, host_audio_file, "", progress, get_now_playing(segment), display_script, gr.Timer(value=0, active=False)
if radio_state["current_segment_index"] >= len(radio_state["planned_show"]):
radio_state["is_playing"] = False
return "🎊 Show completed! Hope you enjoyed it.", None, None, None, "", "", gr.Timer(value=0, active=False)
# Get current segment
segment_index = radio_state["current_segment_index"]
segment = radio_state["planned_show"][segment_index]
radio_state["current_segment_index"] += 1
# Lazily generate LLM content for this segment
user_prefs = radio_state["user_preferences"].copy()
user_id = radio_state.get("user_id")
if user_id:
user_prefs["_user_id"] = user_id # Pass user_id in preferences for RAG queries
segment = agent.enrich_segment(segment, user_prefs)
radio_state["planned_show"][segment_index] = segment
# Execute segment (log to RAG)
agent.execute_segment(segment, user_id=user_id)
radio_state["last_segment"] = segment
# Generate content display and underlying script text
segment_info = format_segment_info(segment)
raw_script = get_segment_text(segment)
# Generate TTS for host commentary
host_audio_file = None
music_player_html = ""
if segment["type"] in ["intro", "outro", "story"]:
text = raw_script
if text and getattr(tts_service, "available", True):
audio_bytes = tts_service.text_to_speech(text)
if audio_bytes:
host_audio_file = os.path.join(AUDIO_DIR, f"segment_{radio_state['current_segment_index']}.mp3")
tts_service.save_audio(audio_bytes, host_audio_file)
elif segment["type"] == "news":
# Handle batched news generation
script_batches = segment.get("script_batches", segment.get("script", []))
if isinstance(script_batches, list) and script_batches:
# Generate first batch immediately (user doesn't wait)
first_batch = script_batches[0]
if getattr(tts_service, "available", True):
audio_bytes = tts_service.text_to_speech(first_batch)
if audio_bytes:
host_audio_file = os.path.join(AUDIO_DIR, f"segment_{radio_state['current_segment_index']}_batch_1.mp3")
tts_service.save_audio(audio_bytes, host_audio_file)
# Store remaining batches for sequential playback
segment["remaining_batches"] = script_batches[1:] if len(script_batches) > 1 else []
else:
# Fallback for old format
text = raw_script
if text and getattr(tts_service, "available", True):
audio_bytes = tts_service.text_to_speech(text)
if audio_bytes:
host_audio_file = f"segment_{radio_state['current_segment_index']}.mp3"
tts_service.save_audio(audio_bytes, host_audio_file)
elif segment["type"] == "music":
# For music: generate host commentary, then get music
commentary = segment.get("commentary", "")
# Generate host commentary
if commentary and getattr(tts_service, "available", True):
audio_bytes = tts_service.text_to_speech(commentary)
if audio_bytes:
host_audio_file = os.path.join(AUDIO_DIR, f"segment_{radio_state['current_segment_index']}_host.mp3")
tts_service.save_audio(audio_bytes, host_audio_file)
# Get music track - check if we should play a liked track
track = segment.get("track", {})
user_id = radio_state.get("user_id")
# 30% chance to play a liked track if user has any
if user_id and user_memory.should_play_liked_track(user_id, probability=0.3):
genre = track.get("genre", "") if track else ""
liked_track = user_memory.get_random_liked_track(user_id, genre)
if liked_track:
print(f"❀️ Playing liked track: {liked_track.get('title', 'Unknown')}")
track = liked_track
segment["track"] = liked_track
segment["from_liked"] = True
if track:
# Store current track for like/dislike buttons
radio_state["current_track"] = track
# Add to user history
if user_id:
user_memory.add_to_history(user_id, track)
print(f"🎡 Preparing music: {track.get('title', 'Unknown')} by {track.get('artist', 'Unknown')}")
# Get streaming URL for YouTube or SoundCloud
# Modal proxy returns "youtube_api" as source, so check both
if track.get("source") in ["youtube", "youtube_api"] and track.get("url"):
try:
# Extract YouTube ID
youtube_id = track.get("youtube_id", "")
if not youtube_id and "v=" in track["url"]:
youtube_id = track["url"].split("v=")[-1].split("&")[0]
if youtube_id:
# # COMMENTED OUT: Delayed loading mechanism
# # Get host audio duration to calculate delay
# delay_ms = 0
# delay_s = 0.0
# if host_audio_file and os.path.exists(host_audio_file):
# try:
# audio = AudioSegment.from_file(host_audio_file)
# duration_ms = len(audio)
# delay_ms = max(0, duration_ms - 3000) # Start 3 seconds before end
# delay_s = delay_ms / 1000.0
# print(f"⏱️ Host audio: {duration_ms}ms, YouTube delay: {delay_ms}ms")
# except Exception as e:
# print(f"⚠️ Could not get audio duration: {e}")
# # Store the actual iframe HTML for later (after host speech ends)
# actual_player_html = create_youtube_player_html(
# youtube_id,
# track.get('title', 'Unknown'),
# track.get('artist', 'Unknown'),
# track['url']
# )
# # Schedule the player to be ready after the delay
# radio_state["pending_player"] = {
# "html": actual_player_html,
# "ready_at": time.time() + delay_s,
# "type": "music",
# "title": track.get('title', 'Unknown')
# }
# # Return placeholder initially (no iframe yet!)
# music_player_html = create_placeholder_html(
# track.get('title', 'Unknown'),
# track.get('artist', 'Unknown'),
# delay_s,
# "music"
# )
# print(f"βœ… YouTube player scheduled: {track.get('title', 'Unknown')} (ID: {youtube_id}) - iframe added after {delay_s:.1f}s")
# Simple immediate iframe - start track with host
music_player_html = create_youtube_player_html(
youtube_id,
track.get('title', 'Unknown'),
track.get('artist', 'Unknown'),
track['url']
)
print(f"βœ… YouTube player created immediately: {track.get('title', 'Unknown')} (ID: {youtube_id})")
else:
# Fallback: Just show link
music_player_html = f"""
<div style="padding: 1rem; background: #f0f0f0; border-radius: 10px; margin: 1rem 0;">
<h4>🎡 {track.get('title', 'Unknown')} - {track.get('artist', 'Unknown')}</h4>
<p>Click to listen:</p>
<a href="{track['url']}" target="_blank" style="display: inline-block; padding: 0.5rem 1rem; background: #ff0000; color: white; text-decoration: none; border-radius: 5px;">
▢️ Play on YouTube
</a>
</div>
"""
except Exception as e:
print(f"❌ Error getting music: {e}")
# Fallback: Just show YouTube link
music_player_html = f"""
<div style="padding: 1rem; background: #f0f0f0; border-radius: 10px; margin: 1rem 0;">
<h4>🎡 {track.get('title', 'Unknown')} - {track.get('artist', 'Unknown')}</h4>
<p>Click to listen:</p>
<a href="{track['url']}" target="_blank" style="display: inline-block; padding: 0.5rem 1rem; background: #ff0000; color: white; text-decoration: none; border-radius: 5px;">
▢️ Play on YouTube
</a>
</div>
"""
elif track.get("source") == "soundcloud" and track.get("url"):
# Simple SoundCloud embed player
try:
music_player_html = f"""
<div style="padding: 1rem; background: linear-gradient(135deg, #ff5500 0%, #ff8800 100%); border-radius: 12px; margin: 1rem 0; box-shadow: 0 4px 15px rgba(0,0,0,0.3);">
<h4 style="margin: 0 0 0.75rem 0; color: #fff; font-size: 1.1em;">🎡 {track.get('title', 'Unknown')}</h4>
<p style="margin: 0 0 0.75rem 0; color: #ffe0cc; font-size: 0.9em;">by {track.get('artist', 'Unknown')}</p>
<iframe width="100%" height="166" scrolling="no" frameborder="no"
style="border-radius: 8px;"
src="https://w.soundcloud.com/player/?url={track['url']}&color=%23ffffff&auto_play=true&hide_related=true&show_comments=false&show_user=true&show_reposts=false&show_teaser=false"></iframe>
<p style="margin-top: 0.75rem; font-size: 0.85em; margin-bottom: 0;">
<a href="{track['url']}" target="_blank" style="color: #fff; text-decoration: none;">πŸ”— Open on SoundCloud</a>
</p>
</div>
"""
print(f"βœ… Streaming from SoundCloud: {track.get('title', 'Unknown')}")
except Exception as e:
print(f"❌ Error with SoundCloud: {e}")
music_player_html = f"""
<div style="padding: 1rem; background: #f0f0f0; border-radius: 10px; margin: 1rem 0;">
<h4>🎡 {track.get('title', 'Unknown')} - {track.get('artist', 'Unknown')}</h4>
<a href="{track['url']}" target="_blank" style="display: inline-block; padding: 0.5rem 1rem; background: #ff5500; color: white; text-decoration: none; border-radius: 5px;">
▢️ Play on SoundCloud
</a>
</div>
"""
else:
# Demo track - no actual audio file
print("ℹ️ Demo track (no audio file available)")
music_player_html = ""
elif segment["type"] == "podcast":
# For podcast: generate intro and create YouTube player
intro = segment.get("intro", "")
podcast = segment.get("podcast", {})
# Initialize music_player_html for podcast
music_player_html = ""
if intro and getattr(tts_service, "available", True):
audio_bytes = tts_service.text_to_speech(intro)
if audio_bytes:
host_audio_file = os.path.join(AUDIO_DIR, f"segment_{radio_state['current_segment_index']}_host.mp3")
tts_service.save_audio(audio_bytes, host_audio_file)
# Create YouTube player for podcast if available
# Modal proxy returns "youtube_api" as source, so check both
print(f"πŸ” Podcast debug: source={podcast.get('source')}, youtube_id={podcast.get('youtube_id')}, url={podcast.get('url', '')[:50]}")
if podcast.get("source") in ["youtube", "youtube_api"] and podcast.get("youtube_id"):
youtube_id = podcast.get("youtube_id", "")
print(f"βœ… Creating podcast iframe with youtube_id: {youtube_id}")
# # COMMENTED OUT: Delayed loading mechanism
# # Get host audio duration to calculate delay
# delay_ms = 0
# if host_audio_file and os.path.exists(host_audio_file):
# try:
# audio = AudioSegment.from_file(host_audio_file)
# duration_ms = len(audio)
# delay_ms = max(0, duration_ms - 3000) # Start 3 seconds before end
# print(f"⏱️ Podcast host audio: {duration_ms}ms, YouTube delay: {delay_ms}ms")
# except Exception as e:
# print(f"⚠️ Could not get audio duration: {e}")
# # Store the actual iframe HTML for later (after host speech ends)
# delay_s = delay_ms / 1000.0
# actual_player_html = create_podcast_player_html(
# youtube_id,
# podcast.get('title', 'Podcast'),
# podcast.get('host', 'Unknown Host'),
# podcast.get('url', '#')
# )
# # Schedule the player to be ready after the delay
# radio_state["pending_player"] = {
# "html": actual_player_html,
# "ready_at": time.time() + delay_s,
# "type": "podcast",
# "title": podcast.get('title', 'Unknown')
# }
# # Return placeholder initially (no iframe yet!)
# music_player_html = create_placeholder_html(
# podcast.get('title', 'Podcast'),
# podcast.get('host', 'Unknown Host'),
# delay_s,
# "podcast"
# )
# print(f"βœ… Podcast YouTube player scheduled: {podcast.get('title', 'Unknown')} (ID: {youtube_id}) - iframe added after {delay_s:.1f}s")
# Simple immediate iframe - start podcast with host
music_player_html = create_podcast_player_html(
youtube_id,
podcast.get('title', 'Podcast'),
podcast.get('host', 'Unknown Host'),
podcast.get('url', '#')
)
print(f"βœ… Podcast YouTube player created immediately: {podcast.get('title', 'Unknown')} (ID: {youtube_id})")
print(f"πŸ“¦ Podcast iframe HTML length: {len(music_player_html)} chars, contains iframe: {'<iframe' in music_player_html}")
elif podcast.get("url") and "youtube" in podcast.get("url", ""):
print(f"⚠️ Podcast: using fallback URL extraction")
# Fallback: extract YouTube ID from URL and create simple embed
url = podcast.get("url", "")
youtube_id = ""
if "v=" in url:
youtube_id = url.split("v=")[-1].split("&")[0]
elif "youtu.be/" in url:
youtube_id = url.split("youtu.be/")[-1].split("?")[0]
if youtube_id:
# # COMMENTED OUT: Delayed loading mechanism
# # Get host audio duration to calculate delay
# delay_ms = 0
# if host_audio_file and os.path.exists(host_audio_file):
# try:
# audio = AudioSegment.from_file(host_audio_file)
# duration_ms = len(audio)
# delay_ms = max(0, duration_ms - 3000)
# except:
# pass
# delay_s = delay_ms / 1000.0
# actual_player_html = create_podcast_player_html(
# youtube_id,
# podcast.get('title', 'Podcast'),
# podcast.get('host', 'Unknown Host'),
# url
# )
# # Schedule the player to be ready after the delay
# radio_state["pending_player"] = {
# "html": actual_player_html,
# "ready_at": time.time() + delay_s,
# "type": "podcast",
# "title": podcast.get('title', 'Unknown')
# }
# # Return placeholder initially
# music_player_html = create_placeholder_html(
# podcast.get('title', 'Podcast'),
# podcast.get('host', 'Unknown Host'),
# delay_s,
# "podcast"
# )
# print(f"βœ… Podcast iframe player scheduled: {podcast.get('title', 'Unknown')} (ID: {youtube_id}) - iframe added after {delay_s:.1f}s")
# Simple immediate iframe
music_player_html = create_podcast_player_html(
youtube_id,
podcast.get('title', 'Podcast'),
podcast.get('host', 'Unknown Host'),
url
)
print(f"βœ… Podcast iframe player created immediately: {podcast.get('title', 'Unknown')} (ID: {youtube_id})")
# Progress info
progress = f"Segment {radio_state['current_segment_index']}/{len(radio_state['planned_show'])}"
# Prepare LLM script for UI: split into smaller segments for readability
display_script = ""
if segment["type"] == "news":
# Use news batches if available
script_batches = segment.get("script_batches", segment.get("script", []))
if isinstance(script_batches, list) and script_batches:
display_script = "\n\n---\n\n".join(str(b) for b in script_batches)
else:
display_script = str(raw_script) if raw_script else ""
else:
if raw_script:
segments = split_text_into_segments(raw_script)
segment["text_segments"] = segments
display_script = "\n\n---\n\n".join(segments)
else:
display_script = ""
# Optionally prefetch next segment's LLM content in the background
def _prefetch_next():
try:
idx = radio_state["current_segment_index"]
if idx < len(radio_state["planned_show"]):
next_seg = radio_state["planned_show"][idx]
enriched = agent.enrich_segment(next_seg, radio_state["user_preferences"])
radio_state["planned_show"][idx] = enriched
except Exception:
pass
threading.Thread(target=_prefetch_next, daemon=True).start()
# Calculate segment duration and set up delayed player if needed
timer_config = gr.Timer(value=0, active=False)
# Only delay player if we have both host audio and a music/podcast player
# Check if music_player_html contains iframe (for YouTube/SoundCloud players)
has_iframe = music_player_html and ("<iframe" in music_player_html)
if host_audio_file and os.path.exists(host_audio_file) and music_player_html and has_iframe:
try:
audio = AudioSegment.from_file(host_audio_file)
host_duration_s = len(audio) / 1000.0
# Start player 3 seconds before host speech ends
delay_s = max(0.5, host_duration_s - 3.0)
print(f"⏱️ Host duration: {host_duration_s:.1f}s, Scheduling player in {delay_s:.1f}s")
# Store the actual player HTML
radio_state["pending_player_html"] = music_player_html
# Return placeholder/empty player and active timer
music_player_html = create_placeholder_html(
segment.get('track', {}).get('title', 'Content') if segment.get('type') == 'music' else segment.get('podcast', {}).get('title', 'Podcast'),
segment.get('track', {}).get('artist', '') if segment.get('type') == 'music' else segment.get('podcast', {}).get('host', ''),
delay_s,
segment.get('type', 'music')
)
timer_config = gr.Timer(value=delay_s, active=True)
except Exception as e:
print(f"⚠️ Error calculating audio duration for delay: {e}")
# Fallback: play immediately (leave music_player_html as is, timer inactive)
# Return: segment_info, host_audio, music_player_html, progress, now_playing, segmented LLM script for UI, timer
return segment_info, host_audio_file, music_player_html, progress, get_now_playing(segment), display_script, timer_config
def stop_radio():
"""Stop the radio stream - pauses at current segment without resetting"""
radio_state["stop_flag"] = True
radio_state["is_playing"] = False
radio_state["is_stopped"] = True
# DON'T reset planned_show or current_segment_index - allow resuming
current_idx = radio_state.get("current_segment_index", 0)
total_segments = len(radio_state.get("planned_show", []))
status_msg = f"⏹️ Stopped at segment {current_idx}/{total_segments}. Click 'Generate & Play' or 'Next Segment' to continue."
# Return status, clear audio, clear music player, progress, now playing, timer
return (
status_msg,
None, # Clear audio
"", # Clear music player HTML
f"Stopped at {current_idx}/{total_segments}",
f"⏸️ Paused - Segment {current_idx}/{total_segments}",
gr.Timer(value=0, active=False) # Stop timer
)
def reset_radio():
"""Fully reset the radio - called when starting fresh"""
radio_state["stop_flag"] = False
radio_state["is_playing"] = False
radio_state["is_stopped"] = False
radio_state["planned_show"] = []
radio_state["current_segment_index"] = 0
radio_state["current_news_batches"] = []
radio_state["news_total_batches"] = 0
radio_state["news_batches_played"] = 0
def format_segment_info(segment: Dict[str, Any]) -> str:
"""Format segment information for display"""
seg_type = segment["type"]
if seg_type == "intro":
return f"πŸŽ™οΈ **Welcome to AI Radio!**\n\n{segment['content']}"
elif seg_type == "outro":
return f"πŸ‘‹ **Thanks for Listening!**\n\n{segment['content']}"
elif seg_type == "music":
track = segment.get("track", {})
if track:
music_info = f"""🎡 **Now Playing**
**{track['title']}**
by {track['artist']}
Genre: {track['genre']}
Duration: {track.get('duration', 'Unknown')}s
*{segment.get('commentary', '')}*
"""
# Add YouTube link if available
if segment.get("_youtube_url"):
music_info += f"\n\nπŸ”— [Listen on YouTube]({segment['_youtube_url']})"
elif track.get("url") and "youtube" in track.get("url", ""):
music_info += f"\n\nπŸ”— [Listen on YouTube]({track['url']})"
return music_info
return "🎡 Music Time!"
elif seg_type == "news":
news_items = segment.get("news_items", [])
news_text = "πŸ“° **News Update**\n\n"
# Handle batched script (list) or old format (string)
script = segment.get("script", segment.get("script_batches", ""))
if isinstance(script, list):
# Join all batches for display
news_text += " ".join(script)
else:
# Old format - string
news_text += str(script) if script else ""
news_text += "\n\n**Headlines:**\n"
for item in news_items[:2]:
news_text += f"\nβ€’ {item.get('title', 'No title')}"
return news_text
elif seg_type == "podcast":
podcast = segment.get("podcast", {})
if podcast:
return f"""πŸŽ™οΈ **Podcast Recommendation**
**{podcast['title']}**
Hosted by {podcast['host']}
{podcast['description']}
Duration: {podcast['duration']}
Rating: {'⭐' * int(podcast.get('rating', 4))}
*{segment.get('intro', '')}*
"""
return "πŸŽ™οΈ Podcast Time!"
elif seg_type == "story":
return f"πŸ“– **Story Time**\n\n{segment.get('content', '')}"
return "πŸ“» Radio Segment"
def get_segment_text(segment: Dict[str, Any]) -> str:
"""Extract text for TTS from segment"""
seg_type = segment["type"]
if seg_type in ["intro", "outro", "story"]:
return str(segment.get("content", ""))
elif seg_type == "news":
# Handle batched news
script_batches = segment.get("script_batches", segment.get("script", []))
if isinstance(script_batches, list):
if script_batches:
return " ".join(str(batch) for batch in script_batches) # Join all batches
return ""
return str(script_batches) if script_batches else "" # Fallback
elif seg_type == "music":
return str(segment.get("commentary", ""))
elif seg_type == "podcast":
return str(segment.get("intro", ""))
return ""
def split_text_into_segments(text: str, max_sentences: int = 2) -> List[str]:
"""Split long LLM answers into smaller sentence-based segments for UI display."""
if not text:
return []
# Split by sentence endings, keep it simple
sentences = re.split(r'(?<=[.!?])\s+', text.strip())
segments: List[str] = []
current: List[str] = []
for s in sentences:
s = s.strip()
if not s:
continue
current.append(s)
if len(current) >= max_sentences:
segments.append(" ".join(current))
current = []
if current:
segments.append(" ".join(current))
return segments
def handle_voice_request(audio_file):
"""Handle voice input request for song from uploaded audio file"""
if not audio_file:
return "⚠️ Please record your voice request first!", None, "", gr.Timer(value=0, active=False)
try:
# Use speech_recognition to process the audio file
import speech_recognition as sr
from pydub import AudioSegment
recognizer = sr.Recognizer()
# Convert audio to WAV format if needed (speech_recognition requires WAV)
audio_path = audio_file
if isinstance(audio_file, tuple):
# Gradio Audio returns (sample_rate, audio_data) or filepath
audio_path = audio_file[1] if len(audio_file) > 1 else audio_file[0]
# If it's not a WAV file, convert it
if audio_path and not audio_path.endswith('.wav'):
try:
# Load and convert to WAV
audio = AudioSegment.from_file(audio_path)
wav_path = audio_path.rsplit('.', 1)[0] + '.wav'
audio.export(wav_path, format="wav")
audio_path = wav_path
except Exception as e:
print(f"⚠️ Could not convert audio: {e}, trying original file")
# Load audio file for recognition
try:
with sr.AudioFile(audio_path) as source:
audio = recognizer.record(source)
except Exception as e:
# Try with pydub conversion first
try:
audio_seg = AudioSegment.from_file(audio_path)
wav_temp = os.path.join(AUDIO_DIR, f"temp_voice_{int(time.time())}.wav")
audio_seg.export(wav_temp, format="wav")
with sr.AudioFile(wav_temp) as source:
audio = recognizer.record(source)
audio_path = wav_temp
except Exception as conv_e:
return f"❌ Could not process audio file: {conv_e}. Please try recording again.", None, "", gr.Timer(value=0, active=False)
# Recognize speech using Google's API
try:
recognized_text = recognizer.recognize_google(audio)
print(f"🎀 Recognized: {recognized_text}")
except sr.UnknownValueError:
return "❌ Could not understand audio. Please speak clearly and try again.", None, "", gr.Timer(value=0, active=False)
except sr.RequestError as e:
return f"❌ Error with speech recognition service: {e}. Please try again.", None, "", gr.Timer(value=0, active=False)
# Process the request
song_request = voice_input_service.process_song_request(recognized_text)
print(f"🎀 Processed request: {song_request}")
# Search for music
tracks = agent.music_server.search_by_request(song_request)
if not tracks:
return f"❌ Could not find music for: '{recognized_text}'. Try saying something like 'play pop music' or 'play a song by [artist name]'!", None, "", gr.Timer(value=0, active=False)
# Get the first matching track
track = tracks[0]
print(f"🎡 Selected track: {track.get('title', 'Unknown')} by {track.get('artist', 'Unknown')}")
# Store current track for like/dislike buttons
radio_state["current_track"] = track
# Add to user history
user_id = radio_state.get("user_id")
if user_id:
user_memory.add_to_history(user_id, track)
# Generate host response using LLM
user_prefs = radio_state.get("user_preferences", {})
host_response = agent.generate_song_request_response(recognized_text, track, user_prefs)
# Generate TTS for host response
audio_file = None
print(f"πŸŽ™οΈ Generating TTS for: {host_response[:50]}...")
if getattr(tts_service, "available", True):
audio_bytes = tts_service.text_to_speech(host_response)
if audio_bytes:
audio_file = os.path.join(AUDIO_DIR, f"voice_request_{int(time.time())}.mp3")
tts_service.save_audio(audio_bytes, audio_file)
print(f"βœ… Audio saved to: {audio_file} ({len(audio_bytes)} bytes)")
else:
print("❌ TTS returned no audio bytes")
else:
print("❌ TTS service not available")
# Create music player HTML with volume fading (0β†’30% during host speech, then 30%β†’100%)
music_player_html = ""
player_id = f"voice_player_{int(time.time())}"
timer_config = gr.Timer(value=0, active=False)
# Modal proxy returns "youtube_api" as source, so check both
if track.get("source") in ["youtube", "youtube_api"]:
youtube_id = track.get("youtube_id", "")
if not youtube_id and "v=" in track.get("url", ""):
youtube_id = track["url"].split("v=")[-1].split("&")[0]
if youtube_id:
# Delayed loading mechanism with gr.Timer
try:
# Get host audio duration to calculate delay
delay_s = 0.5
if audio_file and os.path.exists(audio_file):
audio = AudioSegment.from_file(audio_file)
duration_ms = len(audio)
delay_s = max(0.5, (duration_ms / 1000.0) - 3.0) # Start 3 seconds before end
print(f"⏱️ Voice request audio: {duration_ms}ms, YouTube delay: {delay_s:.1f}s")
# Create the actual player HTML
actual_player_html = create_youtube_player_html(
youtube_id,
track.get('title', 'Unknown'),
track.get('artist', 'Unknown'),
track['url']
)
# Store for timer callback
radio_state["pending_player_html"] = actual_player_html
# Create placeholder
music_player_html = create_placeholder_html(
track.get('title', 'Unknown'),
track.get('artist', 'Unknown'),
delay_s,
"music"
)
# Activate timer
timer_config = gr.Timer(value=delay_s, active=True)
print(f"βœ… Voice request: Player scheduled in {delay_s:.1f}s")
except Exception as e:
print(f"⚠️ Error calculating delay for voice request: {e}")
# Fallback: immediate play
music_player_html = create_youtube_player_html(
youtube_id,
track.get('title', 'Unknown'),
track.get('artist', 'Unknown'),
track['url']
)
else:
music_player_html = f"""
<div style="padding: 1rem; background: #f0f0f0; border-radius: 10px; margin: 1rem 0;">
<h4>🎡 {track.get('title', 'Unknown')} - {track.get('artist', 'Unknown')}</h4>
<a href="{track['url']}" target="_blank" style="display: inline-block; padding: 0.5rem 1rem; background: #ff0000; color: white; text-decoration: none; border-radius: 5px;">
▢️ Play on YouTube
</a>
</div>
"""
elif track.get("source") == "soundcloud":
# SoundCloud embed with simple iframe (no script for volume control)
music_player_html = f"""
<div style="padding: 1rem; background: linear-gradient(135deg, #ff5500 0%, #ff8800 100%); border-radius: 12px; margin: 1rem 0; box-shadow: 0 4px 15px rgba(0,0,0,0.3);">
<h4 style="margin: 0 0 0.75rem 0; color: #fff; font-size: 1.1em;">🎡 {track.get('title', 'Unknown')}</h4>
<p style="margin: 0 0 0.75rem 0; color: #ffe0cc; font-size: 0.9em;">by {track.get('artist', 'Unknown')}</p>
<iframe width="100%" height="166" scrolling="no" frameborder="no"
style="border-radius: 8px;"
src="https://w.soundcloud.com/player/?url={track['url']}&color=%23ffffff&auto_play=true&hide_related=true&show_comments=false&show_user=true&show_reposts=false&show_teaser=false">
</iframe>
<p style="margin-top: 0.75rem; font-size: 0.85em; margin-bottom: 0;">
<a href="{track['url']}" target="_blank" style="color: #fff; text-decoration: none;">πŸ”— Open on SoundCloud</a>
</p>
</div>
"""
# Note: Gradio sanitizes script tags, so autoplay scripts don't work
# The browser may block autoplay - user may need to click play
# Return with player
return f"βœ… {host_response}\n\n🎡 Playing: {track['title']} by {track['artist']}", audio_file, music_player_html, timer_config
except Exception as e:
return f"❌ Error processing voice request: {e}", None, "", gr.Timer(value=0, active=False)
def get_now_playing(segment: Dict[str, Any]) -> str:
"""Get now playing text"""
seg_type = segment["type"]
if seg_type == "music":
track = segment.get("track", {})
if track:
return f"β™ͺ {track['title']} - {track['artist']}"
elif seg_type == "news":
return "πŸ“° News Update"
elif seg_type == "podcast":
podcast = segment.get("podcast", {})
if podcast:
return f"πŸŽ™οΈ {podcast['title']}"
elif seg_type == "story":
return "πŸ“– Story Time"
elif seg_type == "intro":
return "πŸŽ™οΈ Welcome!"
elif seg_type == "outro":
return "πŸ‘‹ Goodbye!"
return "πŸ“» AI Radio"
def init_user(stored_user_id: str = None) -> tuple:
"""Initialize user from stored ID or create new one
Args:
stored_user_id: User ID from browser localStorage/cookie
Returns:
Tuple of (user_id, passphrase, is_new_user)
"""
user_id, passphrase, is_new = user_memory.get_or_create_user(stored_user_id)
radio_state["user_id"] = user_id
# Load user preferences if they exist
saved_prefs = user_memory.get_user_preferences(user_id)
if saved_prefs:
radio_state["user_preferences"] = saved_prefs
return user_id, passphrase, is_new
def authenticate_with_passphrase(passphrase: str) -> tuple:
"""Authenticate user with passphrase
Args:
passphrase: User's passphrase (e.g. "swift-tiger-aurora")
Returns:
Tuple of (success_message, user_id, passphrase)
"""
if not passphrase or not passphrase.strip():
return "⚠️ Please enter your passphrase", "", ""
passphrase = passphrase.strip().lower()
user_id = user_memory.get_user_by_passphrase(passphrase)
if user_id:
radio_state["user_id"] = user_id
# Load user preferences
saved_prefs = user_memory.get_user_preferences(user_id)
if saved_prefs:
radio_state["user_preferences"] = saved_prefs
stats = user_memory.get_user_stats(user_id)
return f"βœ… Welcome back! Logged in successfully.\n\nπŸ“Š Your stats:\nβ€’ Liked tracks: {stats.get('total_likes', 0)}\nβ€’ Play history: {stats.get('total_plays', 0)} tracks", user_id, passphrase
else:
return "❌ Invalid passphrase. Please check and try again.", "", ""
def create_new_account() -> tuple:
"""Create a new user account
Returns:
Tuple of (message, user_id, passphrase)
"""
user_id, passphrase, _ = user_memory.get_or_create_user()
radio_state["user_id"] = user_id
message = f"""πŸŽ‰ Welcome to AI Radio!
Your personal passphrase is:
πŸ”‘ **{passphrase}**
⚠️ **Save this passphrase!** You'll need it to access your account from other devices or if cookies are cleared."""
return message, user_id, passphrase
def like_current_track() -> str:
"""Like the currently playing track"""
user_id = radio_state.get("user_id")
track = radio_state.get("current_track")
if not user_id:
return "⚠️ User not initialized"
if not track:
return "⚠️ No track currently playing"
if user_memory.like_track(user_id, track):
return f"πŸ‘ Liked: {track.get('title', 'Unknown')} by {track.get('artist', 'Unknown')}"
return "❌ Could not save like"
def dislike_current_track() -> str:
"""Dislike the currently playing track"""
user_id = radio_state.get("user_id")
track = radio_state.get("current_track")
if not user_id:
return "⚠️ User not initialized"
if not track:
return "⚠️ No track currently playing"
if user_memory.dislike_track(user_id, track):
return f"πŸ‘Ž Won't play again: {track.get('title', 'Unknown')}"
return "❌ Could not save dislike"
def get_liked_tracks_display() -> str:
"""Get formatted list of liked tracks for display"""
user_id = radio_state.get("user_id")
if not user_id:
return "No liked tracks yet. Start listening and like some songs!"
liked = user_memory.get_liked_tracks(user_id)
if not liked:
return "No liked tracks yet. Like some songs while listening!"
lines = ["### ❀️ Your Liked Tracks\n"]
for i, track in enumerate(liked[-20:], 1): # Show last 20
lines.append(f"{i}. **{track.get('title', 'Unknown')}** - {track.get('artist', 'Unknown')}")
if len(liked) > 20:
lines.append(f"\n*...and {len(liked) - 20} more liked tracks*")
return "\n".join(lines)
def get_user_stats_display() -> str:
"""Get user stats for display"""
user_id = radio_state.get("user_id")
if not user_id:
return "Start listening to see your stats!"
stats = user_memory.get_user_stats(user_id)
liked_count = len(user_memory.get_liked_tracks(user_id))
return f"""### πŸ“Š Your Stats
🎡 **Total Tracks Played:** {stats.get('total_plays', 0)}
πŸ‘ **Liked Tracks:** {liked_count}
πŸ‘Ž **Disliked Tracks:** {stats.get('total_dislikes', 0)}
*Your liked tracks have a 30% chance of playing during music segments!*
"""
def get_stats():
"""Get listening statistics for the current user"""
user_id = radio_state.get("user_id")
if not user_id:
return """πŸ“Š **Your Listening Stats**
*Log in or create an account to track your listening history!*
"""
# Get user-specific stats from user_memory
user_stats = user_memory.get_user_stats(user_id)
liked_tracks = user_memory.get_liked_tracks(user_id)
disliked_tracks = user_memory.get_disliked_tracks(user_id)
play_history = user_memory.get_play_history(user_id)
# Count by type from play history
music_count = sum(1 for t in play_history if t.get('source') in ['youtube', 'soundcloud'])
return f"""πŸ“Š **Your Listening Stats**
πŸ‘€ **User ID:** {user_id}
🎡 **Total Tracks Played:** {user_stats.get('total_plays', 0)}
πŸ‘ **Liked Tracks:** {len(liked_tracks)}
πŸ‘Ž **Disliked Tracks:** {len(disliked_tracks)}
πŸ“œ **Recent History:** {len(play_history)} tracks
*Your liked tracks have a 30% chance of playing during music segments!*
*Disliked tracks will be skipped automatically.*
"""
# Custom CSS for beautiful radio station UI
custom_css = """
#radio-header {
text-align: center;
background: linear-gradient(135deg, #667eea 0%, #764ba2 100%);
color: white;
padding: 2rem;
border-radius: 10px;
margin-bottom: 1rem;
}
#now-playing {
background: linear-gradient(135deg, #f093fb 0%, #f5576c 100%);
color: white;
padding: 1.5rem;
border-radius: 10px;
text-align: center;
font-size: 1.2em;
font-weight: bold;
margin: 1rem 0;
}
#segment-display {
background: #f8f9fa;
padding: 2rem;
border-radius: 10px;
min-height: 200px;
border-left: 4px solid #667eea;
}
.control-button {
font-size: 1.1em !important;
padding: 0.8rem !important;
}
#stats-panel {
background: linear-gradient(135deg, #84fab0 0%, #8fd3f4 100%);
padding: 1.5rem;
border-radius: 10px;
color: #333;
}
"""
# Auto-play JavaScript (injected in head)
auto_play_head = """
<script>
(function() {
let autoPlayEnabled = true;
let nextSegmentTimeout = null;
let currentAudioElement = null;
let musicPlayDuration = 60000; // Wait 60 seconds for music to play
function clickNextSegment() {
const buttons = document.querySelectorAll('button');
for (let btn of buttons) {
if (btn.textContent.includes('Next Segment')) {
console.log('⏭️ Auto-clicking Next Segment button...');
btn.click();
return true;
}
}
return false;
}
function onAudioEnded() {
if (!autoPlayEnabled) return;
console.log('🎡 Host audio ended, scheduling next segment in ' + (musicPlayDuration/1000) + 's...');
// Clear any existing timeout
if (nextSegmentTimeout) clearTimeout(nextSegmentTimeout);
// Wait for music/podcast to play, then auto-advance
nextSegmentTimeout = setTimeout(function() {
clickNextSegment();
}, musicPlayDuration);
}
function setupAutoPlay() {
// Find all audio elements in the host_audio container
const audioContainer = document.querySelector('#host_audio');
if (!audioContainer) {
return;
}
const audio = audioContainer.querySelector('audio');
if (!audio) {
return;
}
// If this is a different audio element than before, set up new listener
if (audio !== currentAudioElement) {
console.log('πŸ”„ New audio element detected, setting up auto-play...');
currentAudioElement = audio;
// Remove old listener if exists (defensive)
audio.removeEventListener('ended', onAudioEnded);
// Add new listener
audio.addEventListener('ended', onAudioEnded);
console.log('βœ… Auto-play listener attached to audio element');
}
}
// Set up on load
if (document.readyState === 'loading') {
document.addEventListener('DOMContentLoaded', function() {
setupAutoPlay();
});
} else {
setupAutoPlay();
}
// Re-setup when DOM changes (Gradio updates audio element)
const observer = new MutationObserver(function(mutations) {
// Check if audio-related changes
for (let mutation of mutations) {
if (mutation.type === 'childList') {
// Look for audio element changes
const hasAudioChange = Array.from(mutation.addedNodes).some(node =>
node.nodeType === 1 && (node.tagName === 'AUDIO' || node.querySelector && node.querySelector('audio'))
);
if (hasAudioChange || mutation.target.id === 'host_audio' ||
(mutation.target.closest && mutation.target.closest('#host_audio'))) {
setupAutoPlay();
break;
}
}
}
});
// Start observing after a short delay to let Gradio initialize
setTimeout(function() {
observer.observe(document.body, { childList: true, subtree: true });
setupAutoPlay(); // Initial setup
// Also periodically check (backup)
setInterval(setupAutoPlay, 5000);
}, 2000);
// Expose functions globally
window.toggleAutoPlay = function(enabled) {
autoPlayEnabled = enabled;
console.log('Auto-play ' + (enabled ? 'enabled' : 'disabled'));
};
window.setMusicPlayDuration = function(ms) {
musicPlayDuration = ms;
console.log('Music play duration set to ' + (ms/1000) + 's');
};
window.cancelNextSegment = function() {
if (nextSegmentTimeout) {
clearTimeout(nextSegmentTimeout);
nextSegmentTimeout = null;
console.log('⏹️ Next segment cancelled');
}
};
window.playNextNow = function() {
if (nextSegmentTimeout) clearTimeout(nextSegmentTimeout);
clickNextSegment();
};
})();
</script>
"""
# Build Gradio Interface
with gr.Blocks(css=custom_css, title="AI Radio 🎡", theme=gr.themes.Soft(), head=auto_play_head) as demo:
# Hidden state for user ID (persisted via localStorage)
user_id_state = gr.State(value=None)
# Header
gr.HTML("""
<div id="radio-header">
<h1>🎡 AI Radio Station 🎡</h1>
<p style="font-size: 1.2em; margin: 0;">Your Personal AI-Powered Radio Experience</p>
<p style="margin: 0.5rem 0 0 0; opacity: 0.9;">Personalized Music β€’ News β€’ Podcasts β€’ Stories</p>
</div>
""")
with gr.Tabs() as tabs:
# Tab 0: Account/Auth
with gr.Tab("πŸ” Account", id=0):
gr.Markdown("### 🎧 Welcome to AI Radio")
# Hidden state for auth
auth_user_id = gr.Textbox(visible=False, elem_id="auth_user_id")
auth_passphrase = gr.Textbox(visible=False, elem_id="auth_passphrase")
with gr.Group():
auth_status = gr.Markdown(
value="*Loading your account...*",
elem_id="auth_status"
)
# Current passphrase display (shown after login/signup)
passphrase_display = gr.Textbox(
label="πŸ”‘ Your Passphrase (save this!)",
value="",
interactive=False,
visible=True,
elem_id="passphrase_display"
)
gr.Markdown("---")
gr.Markdown("### πŸ”“ Login with Passphrase")
gr.Markdown("*Enter your passphrase to access your saved preferences, liked tracks, and history from any device.*")
with gr.Row():
passphrase_input = gr.Textbox(
label="Enter Passphrase",
placeholder="e.g., swift-tiger-aurora",
scale=3
)
login_btn = gr.Button("πŸ”‘ Login", variant="primary", scale=1)
login_status = gr.Textbox(label="Login Status", interactive=False, visible=True)
gr.Markdown("---")
gr.Markdown("### πŸ†• New User?")
gr.Markdown("*Click below to create a new account. You'll receive a unique passphrase to remember.*")
create_account_btn = gr.Button("✨ Create New Account", variant="secondary")
gr.Markdown("---")
gr.Markdown("""
**πŸ’‘ How it works:**
- If you are here for the first time, create a new account
- Use the same passphrase on any device to access your account
- Just enter your passphrase to log back in
""")
# Tab 1: Preferences
with gr.Tab("βš™οΈ Your Preferences", id=1):
gr.Markdown("### 🎯 Personalize Your Radio Experience")
gr.Markdown("Tell us about your preferences so we can create the perfect radio show for you!")
with gr.Row():
with gr.Column():
name_input = gr.Textbox(
label="πŸ‘€ Your Name",
placeholder="Enter your name",
value="Friend"
)
mood_input = gr.Dropdown(
label="😊 Current Mood",
choices=["happy", "energetic", "calm", "focused", "relaxed"],
value="happy"
)
# Add voice selection dropdown
from tts_service import VOICE_OPTIONS
voice_input = gr.Dropdown(
label="🎀 Host Voice",
choices=list(VOICE_OPTIONS.keys()),
value="Rachel (Female)",
info="Choose the voice for your radio host"
)
with gr.Column():
genres_input = gr.Dropdown(
label="🎡 Favorite Music Genres",
choices=["pop", "rock", "jazz", "classical", "electronic", "country", "indie", "rap", "blues", "folk", "hip-hop"],
multiselect=True,
value=["rap"]
)
interests_input = gr.Dropdown(
label="πŸ“° News Interests",
choices=["technology", "world", "business", "entertainment", "science", "sports"],
multiselect=True,
value=["technology", "world"]
)
podcast_input = gr.Dropdown(
label="πŸŽ™οΈ Podcast Interests",
choices=["technology", "business", "comedy", "education", "news", "true-crime"],
multiselect=True,
value=["technology"]
)
gr.Markdown("### πŸŽ›οΈ Content Filter")
gr.Markdown("Choose what types of content you want to hear:")
with gr.Row():
music_filter = gr.Checkbox(label="🎡 Music", value=True)
news_filter = gr.Checkbox(label="πŸ“° News", value=True)
podcast_filter = gr.Checkbox(label="πŸŽ™οΈ Podcasts", value=True)
story_filter = gr.Checkbox(label="πŸ“– AI Stories", value=True)
save_pref_btn = gr.Button("πŸ’Ύ Save & Go to Radio ▢️", variant="primary", size="lg")
pref_status = gr.Textbox(label="Status", interactive=False)
# Tab 2: Radio Player
with gr.Tab("πŸ“» Radio Player", id=2):
gr.Markdown("### 🎧 Start your personalized radio experience!")
now_playing = gr.HTML("<div id='now-playing'>πŸ“» Ready to start</div>")
with gr.Row():
start_btn = gr.Button("▢️ Generate & Play", variant="primary", size="lg", elem_classes="control-button")
next_btn = gr.Button("⏭️ Next Segment", variant="secondary", size="lg", elem_classes="control-button")
stop_btn = gr.Button("⏹️ Stop", variant="stop", size="lg", elem_classes="control-button")
with gr.Row():
voice_audio = gr.Audio(
label="🎀 Record Your Song Request",
type="filepath",
sources=["microphone"],
format="wav"
)
voice_btn = gr.Button("🎀 Ask For Song", variant="primary", size="lg", elem_classes="control-button")
voice_status = gr.Textbox(label="Voice Request Status", value="Record your voice request above, then click the button", interactive=False)
# Like/Dislike buttons for current track
with gr.Row():
like_btn = gr.Button("πŸ‘ Like This Song", variant="secondary", size="sm")
dislike_btn = gr.Button("πŸ‘Ž Don't Play Again", variant="secondary", size="sm")
like_status = gr.Textbox(label="Rating Status", value="Rate the current song!", interactive=False, scale=2)
progress_text = gr.Textbox(label="Progress", value="Ready to start", interactive=False, visible=False)
segment_info = gr.Markdown("**Welcome!** Set your preferences and start the radio.", elem_id="segment-display")
llm_script = gr.Textbox(
label="🧠 Model Script (LLM answer)",
value="Model-generated script will appear here for each segment.",
interactive=False,
lines=6,
visible=False
)
gr.Markdown("**πŸ’‘ Tip:** Host speech plays first, then music/podcasts will stream automatically!")
audio_output = gr.Audio(label="πŸ”Š Host Speech", autoplay=True, type="filepath", elem_id="host_audio")
music_player = gr.HTML(label="🎡 Music/Podcast Player (streaming)")
# Timer for delayed player loading (start disabled)
player_timer = gr.Timer(value=0, active=False)
player_timer.tick(
fn=show_pending_player,
inputs=[],
outputs=[music_player, player_timer]
)
status_text = gr.Textbox(label="Status", value="Ready", interactive=False, visible=False)
# Connect buttons
start_btn.click(
fn=start_and_play_first_segment,
inputs=[],
outputs=[status_text, segment_info, audio_output, music_player, progress_text, now_playing, llm_script, player_timer]
)
next_btn.click(
fn=play_next_segment,
inputs=[],
outputs=[segment_info, audio_output, music_player, progress_text, now_playing, llm_script, player_timer]
)
stop_btn.click(
fn=stop_radio,
inputs=[],
outputs=[status_text, audio_output, music_player, progress_text, now_playing, player_timer],
js="() => { if(window.cancelNextSegment) window.cancelNextSegment(); }"
)
# Voice input button - process recorded audio
voice_btn.click(
fn=handle_voice_request,
inputs=[voice_audio],
outputs=[voice_status, audio_output, music_player, player_timer]
)
# Like/Dislike buttons
like_btn.click(
fn=like_current_track,
inputs=[],
outputs=[like_status]
)
dislike_btn.click(
fn=dislike_current_track,
inputs=[],
outputs=[like_status]
)
# Connect save preferences button (needs to be after both tabs are defined)
save_pref_btn.click(
fn=save_preferences,
inputs=[name_input, genres_input, interests_input, podcast_input, mood_input,
music_filter, news_filter, podcast_filter, story_filter, voice_input],
outputs=[pref_status]
).then(
fn=lambda: gr.Tabs(selected=2), # Go to Radio Player tab (id=2)
outputs=[tabs]
)
# Auth button handlers
def handle_login(passphrase):
"""Handle login with passphrase - returns all data including preferences"""
message, user_id, pp = authenticate_with_passphrase(passphrase)
if user_id:
# Get preferences to autofill
prefs = get_saved_preferences()
# Returns: message, user_id, passphrase, passphrase_display, auth_status_md,
# name, genres, interests, podcasts, mood, music, news, podcast, story, voice
return (message, user_id, pp, pp, f"βœ… Logged in! Passphrase: **{pp}**",
prefs[0], prefs[1], prefs[2], prefs[3], prefs[4],
prefs[5], prefs[6], prefs[7], prefs[8], prefs[9])
# Failed login - return empty/default values
return (message, "", "", "", "*Login failed*",
"Friend", ["pop", "rock"], ["technology", "world"], ["technology"], "happy",
True, True, True, True, "Rachel (Female)")
def handle_create_account():
"""Handle new account creation"""
message, user_id, passphrase = create_new_account()
# Return with passphrase display
return message, user_id, passphrase, passphrase
def handle_init_from_cookie(stored_user_id):
"""Initialize user from cookie-stored user ID - returns preferences too"""
if stored_user_id and stored_user_id.strip():
user_id, passphrase, is_new = init_user(stored_user_id)
prefs = get_saved_preferences()
if is_new:
message = f"""πŸŽ‰ Welcome to AI Radio!
Your personal passphrase is:
πŸ”‘ **{passphrase}**
⚠️ **Save this passphrase!** You'll need it to access your account from other devices."""
else:
stats = user_memory.get_user_stats(user_id)
message = f"""βœ… Welcome back!
Your passphrase: **{passphrase}**
πŸ“Š Your stats:
β€’ Liked tracks: {stats.get('total_likes', 0)}
β€’ Play history: {stats.get('total_plays', 0)} tracks"""
# Return: message, user_id, passphrase, passphrase_display,
# name, genres, interests, podcasts, mood, music, news, podcast, story, voice
return (message, user_id, passphrase, passphrase,
prefs[0], prefs[1], prefs[2], prefs[3], prefs[4],
prefs[5], prefs[6], prefs[7], prefs[8], prefs[9])
else:
# No cookie - return defaults
return ("*No saved account found. Create a new account or log in with your passphrase.*",
"", "", "",
"Friend", ["pop", "rock"], ["technology", "world"], ["technology"], "happy",
True, True, True, True, "Rachel (Female)")
# Login button - single handler that returns everything
login_btn.click(
fn=handle_login,
inputs=[passphrase_input],
outputs=[login_status, auth_user_id, auth_passphrase, passphrase_display, auth_status,
name_input, genres_input, interests_input, podcast_input, mood_input,
music_filter, news_filter, podcast_filter, story_filter, voice_input]
).then(
# Save to localStorage via JS
fn=None,
inputs=[auth_user_id, auth_passphrase],
outputs=[],
js="(userId, passphrase) => { if(userId && passphrase) { localStorage.setItem('ai_radio_user_id', userId); localStorage.setItem('ai_radio_passphrase', passphrase); } }"
).then(
# Go to Radio Player tab after successful login
fn=lambda uid: gr.Tabs(selected=2) if uid else gr.Tabs(selected=0),
inputs=[auth_user_id],
outputs=[tabs]
)
# Create account button
create_account_btn.click(
fn=handle_create_account,
inputs=[],
outputs=[auth_status, auth_user_id, auth_passphrase, passphrase_display]
).then(
# Save to localStorage via JS
fn=None,
inputs=[auth_user_id, auth_passphrase],
outputs=[],
js="(userId, passphrase) => { localStorage.setItem('ai_radio_user_id', userId); localStorage.setItem('ai_radio_passphrase', passphrase); }"
).then(
# Go to preferences tab
fn=lambda: gr.Tabs(selected=1),
outputs=[tabs]
)
# Initialize from localStorage on page load
demo.load(
fn=None,
inputs=[],
outputs=[auth_user_id],
js="() => { return localStorage.getItem('ai_radio_user_id') || ''; }"
).then(
fn=handle_init_from_cookie,
inputs=[auth_user_id],
outputs=[auth_status, auth_user_id, auth_passphrase, passphrase_display,
name_input, genres_input, interests_input, podcast_input, mood_input,
music_filter, news_filter, podcast_filter, story_filter, voice_input]
).then(
# Save to localStorage (in case new user was created)
fn=None,
inputs=[auth_user_id, auth_passphrase],
outputs=[],
js="(userId, passphrase) => { if(userId && passphrase) { localStorage.setItem('ai_radio_user_id', userId); localStorage.setItem('ai_radio_passphrase', passphrase); } }"
)
# Tab 3: Liked Tracks
with gr.Tab("❀️ Liked Tracks", id=3):
gr.Markdown("### ❀️ Your Liked Tracks")
gr.Markdown("Songs you've liked will be played again sometimes! (30% chance during music segments)")
liked_display = gr.Markdown("No liked tracks yet. Like some songs while listening!")
user_stats_display = gr.Markdown("")
refresh_liked_btn = gr.Button("πŸ”„ Refresh Liked Tracks", variant="secondary")
refresh_liked_btn.click(
fn=get_liked_tracks_display,
inputs=[],
outputs=[liked_display]
).then(
fn=get_user_stats_display,
inputs=[],
outputs=[user_stats_display]
)
# Tab 4: Statistics
with gr.Tab("πŸ“Š Your Stats", id=4):
gr.Markdown("### πŸ“ˆ Your Listening Statistics")
stats_display = gr.Markdown(elem_id="stats-panel")
refresh_stats_btn = gr.Button("πŸ”„ Refresh Stats", variant="secondary")
refresh_stats_btn.click(
fn=get_stats,
inputs=[],
outputs=[stats_display]
)
# Load stats on tab open
demo.load(
fn=get_stats,
inputs=[],
outputs=[stats_display]
)
# Tab 5: About
with gr.Tab("ℹ️ About", id=5):
gr.Markdown("""
# 🎡 AI Radio - Your Personal Radio Station
## About This App
AI Radio is an intelligent, personalized radio station powered by cutting-edge AI technology.
It creates a unique listening experience tailored to your preferences, mood, and interests.
## 🌟 Features
- **🎡 Personalized Music**: Curated tracks from YouTube based on your favorite genres and mood
- **πŸ“° Custom News**: Real-time news updates on topics you care about
- **πŸŽ™οΈ Podcast Recommendations**: Discover interesting podcasts matching your interests
- **πŸ“– AI-Generated Stories**: Entertaining stories and fun facts
- **πŸ€– AI Host**: Dynamic AI radio host that introduces segments (choose your preferred voice!)
- **πŸ’Ύ Smart Recommendations**: RAG system learns from your listening history
- **πŸ” User Accounts**: Passphrase-based authentication with saved preferences
## 🧠 How It Works
### MCP Servers (Modular Tools)
Three specialized **MCP (Model Context Protocol)** servers act as tools:
- **Music Server**: Searches YouTube for music tracks matching your preferences
- **News Server**: Fetches real-time news from RSS feeds
- **Podcast Server**: Discovers podcasts on YouTube
### LLM (Large Language Model)
**Nebius GPT-OSS-120B** generates all text content:
- Personalized host commentary and introductions
- Conversational news scripts
- Entertaining stories and fun facts
- All adapted to your mood and preferences
### RAG System (Retrieval-Augmented Generation)
**LlamaIndex-powered RAG** provides context-aware personalization:
- Stores your preferences and listening history
- Retrieves context for better recommendations
- Learns from your behavior to improve suggestions over time
## πŸ› οΈ Technology Stack
- **Gradio**: Beautiful, interactive UI
- **Nebius GPT-OSS-120B** (OpenAI-compatible): LLM for content generation
- **ElevenLabs**: High-quality text-to-speech for voice generation
- **LlamaIndex**: RAG system for personalized recommendations
- **MCP Servers**: Modular tools for music, news, and podcasts
- **yt-dlp**: YouTube music and podcast search
- **YouTube/SoundCloud**: Music and podcast streaming
## πŸ† Built for MCP 1st Birthday Competition
This app demonstrates:
- βœ… **Autonomous Agent Behavior**: Planning, reasoning, and execution
- βœ… **MCP Servers as Tools**: Modular music, news, and podcast servers
- βœ… **RAG System**: Context-aware personalization with LlamaIndex
- βœ… **LLM Integration**: Content generation with Nebius GPT-OSS-120B
- βœ… **Gradio Interface**: Seamless user experience
## πŸ“ How to Use
1. **Create Account**: Get your unique passphrase (saved in browser)
2. **Set Preferences**: Choose genres, interests, and mood
3. **Start Radio**: Click "Generate & Play" to begin your personalized show
4. **Interact**: Like/dislike tracks, request songs by voice
5. **Track Stats**: View your listening history and statistics
---
Made with ❀️ for the MCP 1st Birthday Competition
""")
# Footer
gr.HTML("""
<div style="text-align: center; padding: 2rem; margin-top: 2rem; border-top: 1px solid #ddd;">
<p style="color: #666;">🎡 AI Radio - Personalized Radio for Everyone 🎡</p>
<p style="color: #999; font-size: 0.9em;">Track: MCP in Action - Consumer Applications</p>
</div>
""")
# Launch the app
# Note: For HuggingFace Spaces, use app.py at root level instead
if __name__ == "__main__":
import os
port = int(os.getenv("PORT", 7871))
demo.launch(
server_name="0.0.0.0",
server_port=port,
share=False
)