"""AI Radio - Personalized Radio Station with AI Host"""
import gradio as gr
import os
import json
import time
import re
from typing import Dict, Any, List, Optional
import threading
from pydub import AudioSegment
# Get project root directory (parent of src/)
PROJECT_ROOT = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
AUDIO_DIR = os.path.join(PROJECT_ROOT, "audio")
LOG_DIR = os.path.join(PROJECT_ROOT, "logs")
os.makedirs(AUDIO_DIR, exist_ok=True)
os.makedirs(LOG_DIR, exist_ok=True)
from config import get_config
from radio_agent import RadioAgent
from tts_service import TTSService
from rag_system import RadioRAGSystem
from voice_input import VoiceInputService
from user_memory import UserMemoryService
def create_youtube_player_html(youtube_id: str, title: str, artist: str, url: str) -> str:
"""Create YouTube player iframe HTML"""
return f"""
"""
def create_podcast_player_html(youtube_id: str, title: str, host: str, url: str) -> str:
"""Create podcast YouTube player iframe HTML"""
return f"""
"""
def create_placeholder_html(title: str, artist: str, delay_s: float, media_type: str = "music") -> str:
"""Create placeholder HTML shown while waiting for host speech to end"""
bg_color = "#1a1a2e" if media_type == "music" else "#6b46c1"
icon = "π΅" if media_type == "music" else "ποΈ"
return f"""
{icon} {title}
by {artist}
β³
Video starts in {delay_s:.0f}s...
After host finishes speaking
"""
def get_pending_player():
"""Check if there's a pending player ready to show"""
pending = radio_state.get("pending_player")
if pending and time.time() >= pending.get("ready_at", 0):
html = pending.get("html", "")
radio_state["pending_player"] = None # Clear it
return html
return None
# Global state
radio_state = {
"is_playing": False,
"is_stopped": False, # User explicitly stopped (vs just paused)
"current_segment_index": 0,
"planned_show": [],
"user_preferences": {},
"stop_flag": False,
"current_news_batches": [],
"news_total_batches": 0,
"news_batches_played": 0,
"last_segment": None,
"current_track": None, # Currently playing track for like/dislike
"user_id": None, # Current user ID
"auto_play": True, # Auto-play next segment when current finishes
"content_filter": {
"music": True,
"news": True,
"podcasts": True,
"stories": True
},
# Pending media player (to be added after host speech ends)
"pending_player": None, # {"html": "...", "ready_at": timestamp}
}
# Initialize services
config = get_config()
agent = RadioAgent(config)
# Use open-source local TTS instead of ElevenLabs
tts_service = TTSService(api_key=getattr(config, "elevenlabs_api_key", ""), voice_id=getattr(config, "elevenlabs_voice_id", ""))
voice_input_service = VoiceInputService()
user_memory = UserMemoryService()
def save_preferences(name: str, favorite_genres: List[str], interests: List[str],
podcast_interests: List[str], mood: str,
music_filter: bool, news_filter: bool, podcast_filter: bool, story_filter: bool,
voice_name: str = None) -> str:
"""Save user preferences to RAG system"""
# Initialize user if not already done
if not radio_state.get("user_id"):
user_id, _, _ = user_memory.get_or_create_user()
radio_state["user_id"] = user_id
print(f"π€ User initialized: {radio_state['user_id']}")
# Map voice name to voice_id if needed
from tts_service import VOICE_OPTIONS
selected_voice_id = "21m00Tcm4TlvDq8ikWAM" # Default Rachel
if voice_name and voice_name in VOICE_OPTIONS:
selected_voice_id = VOICE_OPTIONS[voice_name]
preferences = {
"name": name or "Friend",
"favorite_genres": favorite_genres or ["pop"],
"interests": interests or ["world"],
"podcast_interests": podcast_interests or ["technology"],
"mood": mood or "happy",
"voice_id": selected_voice_id,
"voice_name": voice_name or "Rachel (Female)",
"content_filter": {
"music": music_filter,
"news": news_filter,
"podcasts": podcast_filter,
"stories": story_filter
},
"timestamp": time.time()
}
# Update content filter
radio_state["content_filter"] = preferences["content_filter"]
radio_state["user_preferences"] = preferences
# Update TTS service with new voice
global tts_service
tts_service.voice_id = selected_voice_id
print(f"π€ Voice updated to: {voice_name} (ID: {selected_voice_id})")
# Save to RAG system with user_id
user_id = radio_state.get("user_id")
agent.rag_system.store_user_preferences(preferences, user_id=user_id)
# Save to user memory for persistence
if user_id:
user_memory.save_user_preferences(user_id, preferences)
print(f"πΎ Preferences saved for user {user_id}")
return f"β Preferences saved! Welcome, {preferences['name']}! Your personalized radio is ready."
def get_saved_preferences() -> tuple:
"""Get saved preferences for the current user to autofill the form
Returns:
Tuple of (name, genres, interests, podcast_interests, mood, music, news, podcast, story, voice_name)
"""
user_id = radio_state.get("user_id")
if not user_id:
# Return defaults
return "Friend", ["pop", "rock"], ["technology", "world"], ["technology"], "happy", True, True, True, True, "Rachel (Female)"
prefs = user_memory.get_user_preferences(user_id)
if not prefs:
return "Friend", ["pop", "rock"], ["technology", "world"], ["technology"], "happy", True, True, True, True, "Rachel (Female)"
# Extract content filter
content_filter = prefs.get("content_filter", {})
# Get voice name (prefer stored voice_name, otherwise map from voice_id)
voice_name = prefs.get("voice_name", "Rachel (Female)")
if not voice_name or voice_name not in ["Rachel (Female)", "Lera (Female)", "Bella (Female)", "Antoni (Male)", "Arnold (Male)", "Adam (Male)", "Domi (Female)", "Elli (Female)", "Josh (Male)", "Sam (Male)"]:
# Try to map voice_id back to name
from tts_service import VOICE_OPTIONS
voice_id = prefs.get("voice_id", "21m00Tcm4TlvDq8ikWAM")
for name, vid in VOICE_OPTIONS.items():
if vid == voice_id:
voice_name = name
break
else:
voice_name = "Rachel (Female)" # Default fallback
return (
prefs.get("name", "Friend"),
prefs.get("favorite_genres", ["pop", "rock"]),
prefs.get("interests", ["technology", "world"]),
prefs.get("podcast_interests", ["technology"]),
prefs.get("mood", "happy"),
content_filter.get("music", True),
content_filter.get("news", True),
content_filter.get("podcasts", True),
content_filter.get("stories", True),
voice_name
)
def start_radio_stream():
"""Start the radio stream"""
# Use saved preferences, or set defaults if none exist
if not radio_state["user_preferences"]:
# Try to load from user memory
user_id = radio_state.get("user_id")
if user_id:
saved = user_memory.get_user_preferences(user_id)
if saved:
radio_state["user_preferences"] = saved
radio_state["content_filter"] = saved.get("content_filter", {
"music": True, "news": True, "podcasts": True, "stories": True
})
# Still no preferences? Use defaults
if not radio_state["user_preferences"]:
radio_state["user_preferences"] = {
"name": "Friend",
"favorite_genres": ["pop", "rock"],
"interests": ["technology", "world"],
"podcast_interests": ["technology"],
"mood": "happy",
"voice_id": "21m00Tcm4TlvDq8ikWAM",
"voice_name": "Rachel (Female)"
}
radio_state["content_filter"] = {
"music": True, "news": True, "podcasts": True, "stories": True
}
if radio_state["is_playing"]:
return "π» Radio is already playing!", None, None, None, ""
# Plan the show with content filter
user_id = radio_state.get("user_id")
show_plan = agent.plan_radio_show(
user_id=user_id,
user_preferences=radio_state["user_preferences"],
duration_minutes=30,
content_filter=radio_state["content_filter"]
)
radio_state["planned_show"] = show_plan
radio_state["current_segment_index"] = 0
radio_state["is_playing"] = True
radio_state["stop_flag"] = False
radio_state["current_news_batches"] = []
radio_state["news_total_batches"] = 0
radio_state["news_batches_played"] = 0
radio_state["last_segment"] = None
return "π΅ Starting your personalized radio show...", None, None, None, ""
def show_pending_player():
"""Called by timer to finally show the media player"""
player_html = radio_state.get("pending_player_html", "")
print("β° Timer fired: Showing pending media player now")
# Return player HTML and disable timer
return player_html, gr.Timer(value=0, active=False)
def start_and_play_first_segment():
"""
One-shot helper for the UI:
- If resuming from stop: continue from next segment
- If fresh start: Plan the show and play first segment
Returns everything needed to update the UI in a single call.
"""
print("βΆοΈ [start_and_play_first_segment] Starting...")
# Initialize user if not already done
if not radio_state.get("user_id"):
user_id, _, _ = user_memory.get_or_create_user()
radio_state["user_id"] = user_id
print(f"π€ User initialized: {radio_state['user_id']}")
# Use saved preferences, or set defaults if none exist
if not radio_state["user_preferences"]:
# Try to load from user memory
user_id = radio_state.get("user_id")
if user_id:
saved = user_memory.get_user_preferences(user_id)
if saved:
radio_state["user_preferences"] = saved
radio_state["content_filter"] = saved.get("content_filter", {
"music": True, "news": True, "podcasts": True, "stories": True
})
# Update TTS service with saved voice
voice_id = saved.get("voice_id", "21m00Tcm4TlvDq8ikWAM")
tts_service.voice_id = voice_id
print(f"π Loaded saved preferences for user {user_id}")
print(f"π€ Voice set to: {saved.get('voice_name', 'Rachel (Female)')} (ID: {voice_id})")
# Still no preferences? Use defaults
if not radio_state["user_preferences"]:
radio_state["user_preferences"] = {
"name": "Friend",
"favorite_genres": ["pop", "rock"],
"interests": ["technology", "world"],
"podcast_interests": ["technology"],
"mood": "happy",
"voice_id": "21m00Tcm4TlvDq8ikWAM",
"voice_name": "Rachel (Female)"
}
radio_state["content_filter"] = {
"music": True, "news": True, "podcasts": True, "stories": True
}
# Update TTS service with default voice
tts_service.voice_id = "21m00Tcm4TlvDq8ikWAM"
print("π Using default preferences")
# Check if resuming from stopped state (only if show was manually stopped, not completed)
if radio_state.get("is_stopped") and radio_state.get("planned_show") and radio_state["current_segment_index"] < len(radio_state["planned_show"]):
print("βΆοΈ Resuming from stopped state...")
radio_state["is_playing"] = True
radio_state["is_stopped"] = False
radio_state["stop_flag"] = False
# Play next segment (continuing from where we stopped)
segment_info, host_audio, music_html, progress, now_playing, llm_script, timer_config = play_next_segment()
return (
"βΆοΈ Resuming radio...",
segment_info,
host_audio,
music_html,
progress,
now_playing,
llm_script,
timer_config
)
# If show completed or no valid show to resume, start fresh
# Also check if is_playing is True but show is actually completed
if radio_state.get("is_playing") and radio_state.get("planned_show"):
# Check if show is actually completed
if radio_state["current_segment_index"] >= len(radio_state["planned_show"]):
# Show completed, reset and start fresh
print("π Show completed, resetting for new show...")
reset_radio()
else:
# Show is actually playing
return (
"π» Radio is already playing!",
"**Welcome!** Set your preferences and start the radio.",
None, None, "Ready to start", "π» AI Radio",
"Model-generated script will appear here for each segment.",
gr.Timer(value=0, active=False)
)
elif radio_state.get("is_playing"):
# is_playing is True but no planned show - reset
print("π is_playing=True but no planned_show, resetting...")
reset_radio()
import time
t0 = time.time()
# Fresh start - reset and plan new show
reset_radio()
# Step 1: Plan skeleton show (YouTube searches happen here)
print(" [1/3] Planning show (searching YouTube)...")
user_id = radio_state.get("user_id")
show_plan = agent.plan_radio_show(
user_id=user_id,
user_preferences=radio_state["user_preferences"],
duration_minutes=30,
content_filter=radio_state["content_filter"]
)
print(f" [1/3] Done in {time.time()-t0:.1f}s, segments: {len(show_plan)}")
radio_state["planned_show"] = show_plan
radio_state["current_segment_index"] = 0
radio_state["is_playing"] = True
radio_state["stop_flag"] = False
radio_state["is_stopped"] = False
# Step 2 & 3: Generate and play first segment (LLM + TTS inside play_next_segment)
print(" [2/3] Generating first segment (LLM)...")
t1 = time.time()
segment_info, host_audio, music_html, progress, now_playing, llm_script, timer_config = play_next_segment()
print(f" [2/3] Done in {time.time()-t1:.1f}s")
print(f" [3/3] Total time: {time.time()-t0:.1f}s")
print(f" Host audio file: {host_audio}")
return (
"π΅ Starting your personalized radio show...",
segment_info,
host_audio,
music_html,
progress,
now_playing,
llm_script,
timer_config
)
def play_next_segment():
"""Play the next segment in the show - returns host audio and music audio separately"""
# If stopped, resume first
if radio_state.get("is_stopped"):
print("βΆοΈ Resuming from stopped state in play_next_segment...")
radio_state["is_stopped"] = False
radio_state["stop_flag"] = False
radio_state["is_playing"] = True
if not radio_state["is_playing"]:
# If we have a planned show, try to resume
if radio_state.get("planned_show"):
radio_state["is_playing"] = True
radio_state["stop_flag"] = False
else:
return "βΈοΈ No show planned. Click 'Generate & Play' to start.", None, None, None, "", "", gr.Timer(value=0, active=False)
if radio_state["stop_flag"]:
radio_state["is_playing"] = False
return "βΈοΈ Radio paused", None, None, None, "", "", gr.Timer(value=0, active=False)
# If there are remaining news TTS batches, play the next one without advancing to a new segment
if radio_state.get("current_news_batches"):
segment = radio_state.get("last_segment")
if segment:
batch_index = radio_state.get("news_batches_played", 1) + 1
next_batch = radio_state["current_news_batches"].pop(0)
radio_state["news_batches_played"] = batch_index
host_audio_file = None
if getattr(tts_service, "available", True):
audio_bytes = tts_service.text_to_speech(next_batch)
if audio_bytes:
host_audio_file = os.path.join(AUDIO_DIR, f"segment_{radio_state['current_segment_index']}_batch_{batch_index}.mp3")
tts_service.save_audio(audio_bytes, host_audio_file)
# Build display info (same news segment)
segment_info = format_segment_info(segment)
raw_script = get_segment_text(segment)
script_batches = segment.get("script_batches", segment.get("script", []))
if isinstance(script_batches, list) and script_batches:
display_script = "\n\n---\n\n".join(str(b) for b in script_batches)
else:
display_script = str(raw_script) if raw_script else ""
total = radio_state.get("news_total_batches", 1) or 1
progress = f"News batch {batch_index}/{total}"
# When no more batches, clear counters so next click advances to next segment
if not radio_state["current_news_batches"]:
radio_state["news_total_batches"] = 0
radio_state["news_batches_played"] = 0
return segment_info, host_audio_file, "", progress, get_now_playing(segment), display_script, gr.Timer(value=0, active=False)
if radio_state["current_segment_index"] >= len(radio_state["planned_show"]):
radio_state["is_playing"] = False
return "π Show completed! Hope you enjoyed it.", None, None, None, "", "", gr.Timer(value=0, active=False)
# Get current segment
segment_index = radio_state["current_segment_index"]
segment = radio_state["planned_show"][segment_index]
radio_state["current_segment_index"] += 1
# Lazily generate LLM content for this segment
user_prefs = radio_state["user_preferences"].copy()
user_id = radio_state.get("user_id")
if user_id:
user_prefs["_user_id"] = user_id # Pass user_id in preferences for RAG queries
segment = agent.enrich_segment(segment, user_prefs)
radio_state["planned_show"][segment_index] = segment
# Execute segment (log to RAG)
agent.execute_segment(segment, user_id=user_id)
radio_state["last_segment"] = segment
# Generate content display and underlying script text
segment_info = format_segment_info(segment)
raw_script = get_segment_text(segment)
# Generate TTS for host commentary
host_audio_file = None
music_player_html = ""
if segment["type"] in ["intro", "outro", "story"]:
text = raw_script
if text and getattr(tts_service, "available", True):
audio_bytes = tts_service.text_to_speech(text)
if audio_bytes:
host_audio_file = os.path.join(AUDIO_DIR, f"segment_{radio_state['current_segment_index']}.mp3")
tts_service.save_audio(audio_bytes, host_audio_file)
elif segment["type"] == "news":
# Handle batched news generation
script_batches = segment.get("script_batches", segment.get("script", []))
if isinstance(script_batches, list) and script_batches:
# Generate first batch immediately (user doesn't wait)
first_batch = script_batches[0]
if getattr(tts_service, "available", True):
audio_bytes = tts_service.text_to_speech(first_batch)
if audio_bytes:
host_audio_file = os.path.join(AUDIO_DIR, f"segment_{radio_state['current_segment_index']}_batch_1.mp3")
tts_service.save_audio(audio_bytes, host_audio_file)
# Store remaining batches for sequential playback
segment["remaining_batches"] = script_batches[1:] if len(script_batches) > 1 else []
else:
# Fallback for old format
text = raw_script
if text and getattr(tts_service, "available", True):
audio_bytes = tts_service.text_to_speech(text)
if audio_bytes:
host_audio_file = f"segment_{radio_state['current_segment_index']}.mp3"
tts_service.save_audio(audio_bytes, host_audio_file)
elif segment["type"] == "music":
# For music: generate host commentary, then get music
commentary = segment.get("commentary", "")
# Generate host commentary
if commentary and getattr(tts_service, "available", True):
audio_bytes = tts_service.text_to_speech(commentary)
if audio_bytes:
host_audio_file = os.path.join(AUDIO_DIR, f"segment_{radio_state['current_segment_index']}_host.mp3")
tts_service.save_audio(audio_bytes, host_audio_file)
# Get music track - check if we should play a liked track
track = segment.get("track", {})
user_id = radio_state.get("user_id")
# 30% chance to play a liked track if user has any
if user_id and user_memory.should_play_liked_track(user_id, probability=0.3):
genre = track.get("genre", "") if track else ""
liked_track = user_memory.get_random_liked_track(user_id, genre)
if liked_track:
print(f"β€οΈ Playing liked track: {liked_track.get('title', 'Unknown')}")
track = liked_track
segment["track"] = liked_track
segment["from_liked"] = True
if track:
# Store current track for like/dislike buttons
radio_state["current_track"] = track
# Add to user history
if user_id:
user_memory.add_to_history(user_id, track)
print(f"π΅ Preparing music: {track.get('title', 'Unknown')} by {track.get('artist', 'Unknown')}")
# Get streaming URL for YouTube or SoundCloud
# Modal proxy returns "youtube_api" as source, so check both
if track.get("source") in ["youtube", "youtube_api"] and track.get("url"):
try:
# Extract YouTube ID
youtube_id = track.get("youtube_id", "")
if not youtube_id and "v=" in track["url"]:
youtube_id = track["url"].split("v=")[-1].split("&")[0]
if youtube_id:
# # COMMENTED OUT: Delayed loading mechanism
# # Get host audio duration to calculate delay
# delay_ms = 0
# delay_s = 0.0
# if host_audio_file and os.path.exists(host_audio_file):
# try:
# audio = AudioSegment.from_file(host_audio_file)
# duration_ms = len(audio)
# delay_ms = max(0, duration_ms - 3000) # Start 3 seconds before end
# delay_s = delay_ms / 1000.0
# print(f"β±οΈ Host audio: {duration_ms}ms, YouTube delay: {delay_ms}ms")
# except Exception as e:
# print(f"β οΈ Could not get audio duration: {e}")
# # Store the actual iframe HTML for later (after host speech ends)
# actual_player_html = create_youtube_player_html(
# youtube_id,
# track.get('title', 'Unknown'),
# track.get('artist', 'Unknown'),
# track['url']
# )
# # Schedule the player to be ready after the delay
# radio_state["pending_player"] = {
# "html": actual_player_html,
# "ready_at": time.time() + delay_s,
# "type": "music",
# "title": track.get('title', 'Unknown')
# }
# # Return placeholder initially (no iframe yet!)
# music_player_html = create_placeholder_html(
# track.get('title', 'Unknown'),
# track.get('artist', 'Unknown'),
# delay_s,
# "music"
# )
# print(f"β YouTube player scheduled: {track.get('title', 'Unknown')} (ID: {youtube_id}) - iframe added after {delay_s:.1f}s")
# Simple immediate iframe - start track with host
music_player_html = create_youtube_player_html(
youtube_id,
track.get('title', 'Unknown'),
track.get('artist', 'Unknown'),
track['url']
)
print(f"β YouTube player created immediately: {track.get('title', 'Unknown')} (ID: {youtube_id})")
else:
# Fallback: Just show link
music_player_html = f"""