AI-RADIO / src /radio_agent.py
Nikita Makarov
v2.4
9b6a0be
"""AI Radio Agent with Autonomous Behavior"""
import json
import random
import logging
import os
from typing import Dict, Any, List, Generator
from datetime import datetime
from openai import OpenAI
from mcp_servers.music_server import MusicMCPServer
from mcp_servers.news_server import NewsMCPServer
from mcp_servers.podcast_server import PodcastMCPServer
from rag_system import RadioRAGSystem
from user_memory import UserMemoryService
# -----------------------------------------------------------------------------
# LLM Logging Setup
# -----------------------------------------------------------------------------
# Get project root directory (parent of src/)
PROJECT_ROOT = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
LOG_DIR = os.path.join(PROJECT_ROOT, "logs")
os.makedirs(LOG_DIR, exist_ok=True)
llm_logger = logging.getLogger("ai_radio.llm")
if not llm_logger.handlers:
llm_logger.setLevel(logging.INFO)
_fh = logging.FileHandler(os.path.join(LOG_DIR, "llm_answers.log"), encoding="utf-8")
_fmt = logging.Formatter("%(asctime)s [%(levelname)s] %(message)s")
_fh.setFormatter(_fmt)
llm_logger.addHandler(_fh)
class RadioAgent:
"""Autonomous AI Radio Agent with planning, reasoning, and execution"""
def __init__(self, config):
self.config = config
# Initialize Nebius/OpenAI LLM (GPT-OSS-120B) with error handling
self.client = None
if config.nebius_api_key:
try:
self.client = OpenAI(
api_key=config.nebius_api_key,
base_url=config.nebius_api_base
)
# Test the connection
self.client.models.list()
except Exception as e:
print(f"Warning: Could not initialize Nebius/OpenAI LLM: {e}")
print("Radio agent will work in fallback mode with limited LLM features")
self.client = None
# Initialize MCP Servers
self.music_server = MusicMCPServer()
self.news_server = NewsMCPServer()
self.podcast_server = PodcastMCPServer()
# Initialize RAG System (using Nebius API key)
self.rag_system = RadioRAGSystem(config.nebius_api_key, config.nebius_api_base, config.nebius_model)
# Agent state
self.is_streaming = False
self.current_segment = None
self.segment_history = []
self.current_user_id = None # Track current user for RAG queries
def plan_radio_show(self, user_preferences: Dict[str, Any], duration_minutes: int = 30,
content_filter: Dict[str, bool] = None, user_id: str = None) -> List[Dict[str, Any]]:
"""
Plan a personalized radio show based on user preferences
This demonstrates autonomous planning behavior
Args:
user_preferences: User's preferences and mood
duration_minutes: Total duration of the show
content_filter: Dictionary with content type filters (music, news, podcasts, stories)
user_id: User ID for RAG queries
Returns:
List of planned segments
"""
# Store user_id for later use
self.current_user_id = user_id
segments = []
# Default filter - all enabled
if content_filter is None:
content_filter = {
"music": True,
"news": True,
"podcasts": True,
"stories": True
}
# Get user preferences from RAG (filtered by user_id)
stored_prefs = self.rag_system.get_user_preferences(user_id=user_id)
merged_prefs = {**stored_prefs, **user_preferences}
# Calculate segment distribution based on filter
total_segments = max(5, duration_minutes // 5)
music_count = int(total_segments * self.config.music_ratio) if content_filter.get("music", True) else 0
news_count = int(total_segments * self.config.news_ratio) if content_filter.get("news", True) else 0
podcast_count = int(total_segments * self.config.podcast_ratio) if content_filter.get("podcasts", True) else 0
story_count = max(1, int(total_segments * self.config.story_ratio)) if content_filter.get("stories", True) else 0
# Balance to total
total_planned = music_count + news_count + podcast_count + story_count
if total_planned == 0:
# If all filtered out, enable music as default
music_count = total_segments
else:
remaining = total_segments - total_planned
# Distribute remaining to enabled types
enabled_types = sum([content_filter.get("music", True), content_filter.get("news", True),
content_filter.get("podcasts", True), content_filter.get("stories", True)])
if enabled_types > 0:
per_type = remaining // enabled_types
if content_filter.get("music", True):
music_count += per_type
if content_filter.get("news", True):
news_count += per_type
if content_filter.get("podcasts", True):
podcast_count += per_type
if content_filter.get("stories", True):
story_count += per_type
# Create segment plan
segment_types = []
if content_filter.get("music", True):
segment_types.extend(['music'] * music_count)
if content_filter.get("news", True):
segment_types.extend(['news'] * news_count)
if content_filter.get("podcasts", True):
segment_types.extend(['podcast'] * podcast_count)
if content_filter.get("stories", True):
segment_types.extend(['story'] * story_count)
# Shuffle for variety
random.shuffle(segment_types)
# Ensure we start with an introduction
segments.append({
'type': 'intro',
'content': None, # Will be generated lazily when segment is played
'duration': 1
})
# Generate each segment as a SKELETON (no YouTube search, no LLM yet)
# Actual content will be fetched lazily when the segment is played
for seg_type in segment_types:
if seg_type == 'music':
# Skeleton: just record the type and preferences; track will be fetched lazily
segments.append({
'type': 'music',
'track': None, # Will be fetched when segment is played
'commentary': None,
'duration': 3,
'_prefs': merged_prefs # Store prefs for lazy fetch
})
elif seg_type == 'news':
# Skeleton: just record the type; news will be fetched lazily
segments.append({
'type': 'news',
'news_items': None,
'script': None,
'script_batches': None,
'duration': 2,
'_prefs': merged_prefs
})
elif seg_type == 'podcast':
# Skeleton: just record the type; podcast will be fetched lazily
segments.append({
'type': 'podcast',
'podcast': None,
'intro': None,
'duration': 2,
'_prefs': merged_prefs
})
elif seg_type == 'story':
# Skeleton: just record the type; story will be generated lazily
segments.append({
'type': 'story',
'content': None,
'duration': 2,
'_prefs': merged_prefs
})
# Add outro
segments.append({
'type': 'outro',
'content': None, # Will be generated lazily when segment is played
'duration': 1
})
return segments
def _generate_intro(self, preferences: Dict[str, Any]) -> str:
"""Generate personalized radio intro - longer and more engaging"""
mood = preferences.get('mood', 'happy')
name = preferences.get('name', 'friend')
time_of_day = self._get_time_of_day()
interests = preferences.get('interests', ['technology'])
# Get voice name from preferences (remove gender suffix)
voice_name = preferences.get('voice_name', 'Rachel (Female)')
host_name = voice_name.split(' (')[0] if ' (' in voice_name else voice_name
if self.client:
try:
prompt = f"""You are a charismatic, entertaining radio host named {host_name}, you work on AI RADIO. Create a warm, engaging {time_of_day} greeting
for {name}. The listener is feeling {mood} and interested in {', '.join(interests[:2])}.
Make it:
- 2-3 sentences long (about 10 seconds of speech, max_tokens = 200)
- Energetic and personal
- Include a fun fact or light joke related to their interests
- Make them excited to listen!
- Sound natural and conversational, like a real radio host"""
response = self.client.chat.completions.create(
model=self.config.nebius_model,
messages=[{"role": "user", "content": prompt}],
temperature=0.9,
max_tokens=200 #200
)
content = response.choices[0].message.content
if not content:
return "Welcome to AI Radio! Let's get started with some great music."
text = content.strip()
llm_logger.info(
"INTRO | model=%s | prompt=%r | response=%r",
self.config.nebius_model,
prompt,
text[:1000], # truncate to avoid huge logs
)
return text
except Exception as e:
print(f"Error generating intro: {e}")
# Fallback intro
return f"Good {time_of_day}, {name}! Welcome to your personal AI Radio station. We've got an amazing show lined up for you today! Did you know that music can actually boost your productivity? That's right! So sit back, relax, and let's get this party started!"
def _generate_outro(self, preferences: Dict[str, Any]) -> str:
"""Generate personalized radio outro"""
name = preferences.get('name', 'friend')
if self.client:
try:
prompt = f"""You are a charismatic radio host wrapping up a show.
Create a warm, friendly goodbye message for {name}.
Thank them for listening and invite them back. Keep it 2-3 sentences. max_tokens = 200"""
response = self.client.chat.completions.create(
model=self.config.nebius_model,
messages=[{"role": "user", "content": prompt}],
temperature=0.9,
max_tokens=200 #200
)
content = response.choices[0].message.content
if not content:
return "Welcome to AI Radio! Let's get started with some great music."
text = content.strip()
llm_logger.info(
"OUTRO | model=%s | prompt=%r | response=%r",
self.config.nebius_model,
prompt,
text[:1000],
)
return text
except Exception as e:
print(f"Error generating outro: {e}")
return f"That's all for now, {name}! Thanks for tuning in to AI Radio. Come back soon for more personalized content!"
def _plan_music_segment(self, preferences: Dict[str, Any], user_id: str = None) -> Dict[str, Any]:
"""Plan a music segment using Music MCP Server with RAG preferences"""
# Get user preferences from RAG (more accurate than just preferences dict)
rag_prefs = {}
if user_id and self.rag_system:
try:
rag_prefs = self.rag_system.get_user_preferences(user_id)
print(f"🎵 [RAG] Retrieved preferences for user {user_id}: genres={rag_prefs.get('favorite_genres', [])}")
except Exception as e:
print(f"⚠️ [RAG] Could not get preferences: {e}")
# Use RAG preferences if available, otherwise fall back to provided preferences
genres = rag_prefs.get('favorite_genres') or preferences.get('favorite_genres', ['pop'])
mood = rag_prefs.get('mood') or preferences.get('mood', 'happy')
# Get recently played tracks from user memory to avoid duplicates
played_video_ids = set()
if user_id:
try:
user_memory = UserMemoryService()
play_history = user_memory.get_play_history(user_id)
# Extract YouTube IDs from history
for entry in play_history:
if entry.get('youtube_id'):
played_video_ids.add(entry['youtube_id'])
elif entry.get('url') and 'youtube.com' in entry.get('url', ''):
# Extract ID from URL
url = entry['url']
if 'v=' in url:
vid_id = url.split('v=')[-1].split('&')[0]
played_video_ids.add(vid_id)
print(f"🎵 [RAG] Found {len(played_video_ids)} previously played tracks to avoid")
except Exception as e:
print(f"⚠️ Could not get play history: {e}")
# Search for music, filtering out played tracks
max_attempts = 5
track = None
for attempt in range(max_attempts):
# Use MCP server to get music
tracks = self.music_server.search_free_music(
genre=random.choice(genres) if genres else 'pop',
mood=mood,
limit=10 # Get more results to filter from
)
# Filter out already played tracks
available_tracks = [
t for t in tracks
if t.get('youtube_id') and t['youtube_id'] not in played_video_ids
]
if available_tracks:
track = random.choice(available_tracks)
# Mark as recently played in music server to avoid duplicates
if track.get('youtube_id'):
self.music_server._add_to_recently_played(track['youtube_id'])
print(f"✅ [RAG] Selected new track: {track.get('title', 'Unknown')} (not in history)")
break
else:
print(f"⚠️ [RAG] All tracks already played, trying different genre...")
# Try a different genre if all tracks were played
if attempt < max_attempts - 1:
genres = [g for g in genres if g != random.choice(genres)] or ['pop']
# If still no track, use any available (fallback)
if not track and tracks:
track = tracks[0]
print(f"⚠️ Using track despite possible duplicate: {track.get('title', 'Unknown')}")
return {
'type': 'music',
'track': track,
'commentary': None, # Will be generated lazily when segment is played
'duration': 3
}
def _plan_news_segment(self, preferences: Dict[str, Any]) -> Dict[str, Any]:
"""Plan a news segment using News MCP Server (LLM script generated later)"""
interests = preferences.get('interests', ['world'])
category = random.choice(interests)
# Use MCP server to get news
news_items = self.news_server.fetch_news(category=category, limit=3)
return {
'type': 'news',
'news_items': news_items,
'script': None,
'script_batches': None, # Will be filled lazily
'duration': 2 # Will be updated based on actual length
}
def _plan_podcast_segment(self, preferences: Dict[str, Any]) -> Dict[str, Any]:
"""Plan a podcast segment using Podcast MCP Server (LLM intro later)"""
interests = preferences.get('podcast_interests', ['technology'])
category = random.choice(interests)
# Use MCP server to get podcasts
podcasts = self.podcast_server.get_trending_podcasts(category=category, limit=1)
podcast = podcasts[0] if podcasts else None
return {
'type': 'podcast',
'podcast': podcast,
'intro': None, # Will be generated lazily when segment is played
'duration': 2
}
def _plan_story_segment(self, preferences: Dict[str, Any]) -> Dict[str, Any]:
"""Plan a story/fun fact segment (LLM later)"""
return {
'type': 'story',
'content': None, # Will be generated lazily when segment is played
'duration': 2
}
def _generate_music_commentary(self, track: Dict[str, Any], preferences: Dict[str, Any], rag_context: List[Dict[str, Any]] = None) -> str:
"""Generate longer, more engaging commentary for music track with jokes/facts"""
if not track or not self.client:
return f"Here's a great track for you!"
mood = preferences.get('mood', 'happy')
interests = preferences.get('interests', [])
# Add RAG context to prompt if available
rag_context_text = ""
if rag_context:
rag_context_text = "\n\nUser's listening history context (use this to personalize your commentary):\n"
for i, ctx in enumerate(rag_context[:2]): # Use top 2 most relevant
ctx_text = ctx.get('text', '')[:200]
score = ctx.get('score', 0)
rag_context_text += f"- Context #{i+1} (relevance: {score:.4f}): {ctx_text}\n"
llm_logger.info(
"RAG_CONTEXT | track=%s | context_docs=%d | context_preview=%s",
track.get('title', 'Unknown'),
len(rag_context),
rag_context_text[:300]
)
print(f" 📝 [RAG] Added {len(rag_context)} context documents to LLM prompt")
else:
print(f" ℹ️ [RAG] No context available - using standard prompt")
# Get voice name from preferences (remove gender suffix)
voice_name = preferences.get('voice_name', 'Rachel (Female)')
host_name = voice_name.split(' (')[0] if ' (' in voice_name else voice_name
try:
prompt = f"""You are an energetic, entertaining radio DJ named {host_name}. This is a middle of a show. Introduce this song in a fun, engaging way:
Title: {track['title']}
Artist: {track['artist']}
Genre: {track['genre']}
Listener mood: {mood}{rag_context_text}
Make it:
- About 10-15 seconds of speech, max_tokens = 400
- Include a fun fact about the genre, artist, or music in general
- Add a light joke or witty comment
- Be enthusiastic and engaging
- Sound natural, like a real radio host
- Connect it to the listener's mood if possible
- Reference their listening history if relevant (from context above)"""
if rag_context:
print(f" ✅ [RAG] LLM prompt includes RAG context - model will use personalized history")
response = self.client.chat.completions.create(
model=self.config.nebius_model,
messages=[{"role": "user", "content": prompt}],
temperature=0.9,
max_tokens=400 #400
)
content = response.choices[0].message.content
if not content:
return f"Coming up: {track['title']} by {track['artist']}! This is a fantastic {track['genre']} track that's perfect for your {mood} mood. Let's enjoy this one!"
text = content.strip()
# Check if response references RAG context
rag_used = False
if rag_context:
# Check if response mentions anything from context
text_lower = text.lower()
for ctx in rag_context:
ctx_text = ctx.get('text', '').lower()
# Look for key terms from context in response
key_terms = ['listened', 'enjoyed', 'prefer', 'favorite', 'history', 'before', 'previous', 'similar']
if any(term in ctx_text and term in text_lower for term in key_terms):
rag_used = True
break
if rag_used:
print(f" ✅ [RAG] LLM response appears to reference RAG context!")
llm_logger.info(f"✅ RAG CONTEXT USED in commentary for {track.get('title')}")
else:
print(f" ℹ️ [RAG] LLM response doesn't explicitly reference context (may still be influenced)")
llm_logger.info(
"MUSIC | model=%s | rag_used=%s | prompt_length=%d | response=%r",
self.config.nebius_model,
rag_used,
len(prompt),
text[:1000],
)
return text
except Exception as e:
return f"Coming up: {track['title']} by {track['artist']}! This is a fantastic {track['genre']} track that's perfect for your {mood} mood. Let's enjoy this one!"
def _generate_news_script(self, news_items: List[Dict[str, Any]], preferences: Dict[str, Any]) -> List[str]:
"""Generate news segment script in batches (1-2 minutes total, ~200-250 words)"""
if not news_items:
return ["That's all for news. Back to music!"]
if self.client:
try:
# Generate full news script (1-2 minutes of speech)
# Check if news items have required fields
valid_items = []
for item in news_items[:3]:
if item.get('title') and item.get('summary'):
valid_items.append(item)
if not valid_items:
print("⚠️ No valid news items with title and summary, using fallback")
raise ValueError("No valid news items")
news_text = "\n".join([f"- {item['title']}: {item['summary']}" for item in valid_items])
print(f"📰 Generating news script from {len(valid_items)} items")
# Get voice name from preferences (remove gender suffix)
voice_name = preferences.get('voice_name', 'Rachel (Female)')
host_name = voice_name.split(' (')[0] if ' (' in voice_name else voice_name
prompt = f"""You are a professional radio news presenter named {host_name}. Say that this is news time on the show.
Present these news items in a conversational, engaging way. Text includes only what {host_name} needs to say. This should be about 1 minute of speech (about 150-200 words):
News items:
{news_text}
Requirements:
- Start by saying "It's news time on the show"
- Present each story in a conversational, engaging way
- Provide brief context for each story
- Use smooth transitions between stories
- Be informative but friendly and conversational
- Sound natural, like a real radio news anchor
- Keep it concise - about 150-200 words total"""
print(f"📝 News prompt length: {len(prompt)} chars")
response = self.client.chat.completions.create(
model=self.config.nebius_model,
messages=[{"role": "user", "content": prompt}],
temperature=0.9,
max_tokens=400 # Increased from 100 to allow proper response
)
# Check response structure
if not response or not response.choices:
print("⚠️ LLM returned empty response object")
raise ValueError("Empty response object from LLM")
if not response.choices[0].message:
print("⚠️ LLM response has no message")
raise ValueError("No message in LLM response")
content = response.choices[0].message.content
if not content:
print("⚠️ LLM returned empty content for news script, using fallback")
print(f" Response object: {response}")
raise ValueError("Empty content from LLM")
full_script = content.strip()
print(f"✅ Generated news script: {len(full_script)} chars, {len(full_script.split())} words")
llm_logger.info(
"NEWS | model=%s | prompt=%r | response=%r",
self.config.nebius_model,
prompt,
full_script[:1500],
)
# Split into batches (approximately 50-60 words each for smooth playback)
# This allows streaming without waiting for full generation
sentences = full_script.split('. ')
batches = []
current_batch = ""
for sentence in sentences:
if len(current_batch.split()) < 50:
current_batch += sentence + ". "
else:
batches.append(current_batch.strip())
current_batch = sentence + ". "
if current_batch:
batches.append(current_batch.strip())
return batches if batches else [full_script]
except Exception as e:
print(f"❌ Error generating news script: {e}")
import traceback
traceback.print_exc()
# Fallback - create a simple script from available news items
print("📰 Using fallback news script")
script = "It's news time on the show. "
valid_items = [item for item in news_items[:2] if item.get('title')]
if valid_items:
for item in valid_items:
title = item.get('title', 'News story')
script += f"{title}. "
script += "That's the news for now. Back to music!"
else:
script = "That's all for news. Back to music!"
return [script]
def _generate_podcast_intro(self, podcast: Dict[str, Any], preferences: Dict[str, Any]) -> str:
"""Generate podcast introduction"""
if not podcast or not self.client:
return "Time for an interesting podcast!"
# Get voice name from preferences (remove gender suffix)
voice_name = preferences.get('voice_name', 'Rachel (Female)')
host_name = voice_name.split(' (')[0] if ' (' in voice_name else voice_name
try:
prompt = f"""You are a radio host {host_name} introducing a podcast. This is a middle of the show. Create a brief, engaging intro (2-3 sentences, max_tokens = 200):
Podcast: {podcast['title']}
Host: {podcast['host']}
Description: {podcast['description']}
Make listeners want to check it out!"""
response = self.client.chat.completions.create(
model=self.config.nebius_model,
messages=[{"role": "user", "content": prompt}],
temperature=0.8,
max_tokens=200 #200
)
content = response.choices[0].message.content
if not content:
return f"Here's an interesting podcast: {podcast.get('title', 'Unknown')}. Let's listen!"
text = content.strip()
llm_logger.info(
"PODCAST | model=%s | prompt=%r | response=%r",
self.config.nebius_model,
prompt,
text[:1000],
)
return text
except Exception as e:
return f"Check out {podcast['title']} hosted by {podcast['host']}!"
def _generate_story(self, mood: str, interests: List[str], preferences: Dict[str, Any] = None) -> str:
"""Generate an interesting story or fun fact"""
if not self.client:
return "Here's a fun fact: Music can boost your mood and productivity!"
# Get voice name from preferences (remove gender suffix)
if preferences:
voice_name = preferences.get('voice_name', 'Rachel (Female)')
host_name = voice_name.split(' (')[0] if ' (' in voice_name else voice_name
else:
host_name = "Rachel" # Default fallback
try:
interest = random.choice(interests) if interests else "general knowledge"
prompt = f"""You are a radio host named {host_name}. Share a fascinating, {mood} story or fun fact about {interest}.
Keep it engaging and under 100-200 words. max_tokens = 600"""
response = self.client.chat.completions.create(
model=self.config.nebius_model,
messages=[{"role": "user", "content": prompt}],
temperature=0.9,
max_tokens=600 #600
)
content = response.choices[0].message.content
if not content:
return "Here's a fascinating story for you. Let me share it with you."
text = content.strip()
llm_logger.info(
"STORY | model=%s | prompt=%r | response=%r",
self.config.nebius_model,
prompt,
text[:1000],
)
return text
except Exception as e:
return "Here's something interesting: The world's oldest radio station has been broadcasting since 1920!"
def generate_song_request_response(self, user_request: str, track: Dict[str, Any], preferences: Dict[str, Any] = None) -> str:
"""Generate a short, engaging host response for a voice song request"""
title = track.get('title', 'this track')
artist = track.get('artist', 'Unknown Artist')
genre = track.get('genre', '')
# Get voice name from preferences (remove gender suffix)
if preferences:
voice_name = preferences.get('voice_name', 'Rachel (Female)')
host_name = voice_name.split(' (')[0] if ' (' in voice_name else voice_name
else:
host_name = "Rachel" # Default fallback
if not self.client:
return f"Great choice! Here's '{title}' by {artist} coming right up!"
try:
prompt = f"""You are an upbeat radio DJ named {host_name}. A listener just asked: "{user_request}"
You found the song "{title}" by {artist}{f' ({genre})' if genre else ''}.
Write a SHORT, enthusiastic 1-2 sentence response (max 200 tokens) introducing the song.
Be natural and fun - you can add a quick fact or joke if it fits.
Don't use emojis. Just speak naturally like a real DJ."""
response = self.client.chat.completions.create(
model=self.config.nebius_model,
messages=[{"role": "user", "content": prompt}],
temperature=0.9,
max_tokens=200 #200 # Keep it short for fast response
)
content = response.choices[0].message.content
if not content:
return f"Great choice! Here's '{title}' by {artist} coming right up!"
text = content.strip()
llm_logger.info(
"SONG_REQUEST | model=%s | prompt=%r | response=%r",
self.config.nebius_model,
prompt,
text[:500],
)
return text
except Exception as e:
print(f"Error generating song request response: {e}")
return f"You got it! Here's '{title}' by {artist}!"
def _get_time_of_day(self) -> str:
"""Get appropriate greeting based on time"""
hour = datetime.now().hour
if hour < 12:
return "morning"
elif hour < 18:
return "afternoon"
else:
return "evening"
def execute_segment(self, segment: Dict[str, Any], user_id: str = None) -> Dict[str, Any]:
"""Execute a planned segment and log to RAG"""
self.current_segment = segment
self.segment_history.append(segment)
# Use stored user_id if not provided
if not user_id:
user_id = self.current_user_id
# Store in RAG system with user_id
self.rag_system.store_listening_history(
item_type=segment['type'],
item_data=segment,
user_id=user_id
)
return segment
# ------------------------------------------------------------------
# Lazy LLM enrichment for segments
# ------------------------------------------------------------------
def enrich_segment(self, segment: Dict[str, Any], preferences: Dict[str, Any]) -> Dict[str, Any]:
"""
Ensure the given segment has its LLM-generated content populated.
Called just-in-time when the segment is about to be played.
For music/news/podcast segments, this also fetches the actual track/news/podcast
if they haven't been fetched yet (lazy loading).
"""
seg_type = segment.get("type")
# Use stored prefs if available (from skeleton), otherwise use passed prefs
prefs = segment.get("_prefs", preferences)
if seg_type == "intro":
if not segment.get("content"):
segment["content"] = self._generate_intro(prefs)
elif seg_type == "outro":
if not segment.get("content"):
segment["content"] = self._generate_outro(prefs)
elif seg_type == "music":
# Lazy fetch track if not yet fetched
if not segment.get("track"):
# Use RAG to get personalized recommendations
genres = prefs.get('favorite_genres', ['pop'])
mood = prefs.get('mood', 'happy')
# Use stored user_id if available
user_id = prefs.get('_user_id') or self.current_user_id
# Query RAG for music preferences based on listening history
rag_query = f"What music genres and styles does the user prefer? Favorite genres: {', '.join(genres)}, Mood: {mood}"
print(f"\n🎵 [RAG] Querying for genre recommendations for user {user_id}...")
rag_context = self.rag_system.query_user_context(rag_query, user_id=user_id, top_k=3)
original_genres = genres.copy()
# Extract genre suggestions from RAG if available
if rag_context:
print(f"🎵 [RAG] Analyzing {len(rag_context)} context documents for genre suggestions...")
for i, ctx in enumerate(rag_context):
ctx_text = ctx.get('text', '').lower()
score = ctx.get('score', 0)
# Look for genre mentions in context
for genre in ['pop', 'rock', 'jazz', 'classical', 'electronic', 'hip-hop', 'country', 'indie', 'rap', 'blues', 'folk', 'metal', 'reggae', 'soul', 'r&b']:
if genre in ctx_text and genre not in genres:
genres.append(genre)
print(f" ✅ [RAG] Suggested genre from context #{i+1} (score: {score:.4f}): {genre}")
else:
print(f" ℹ️ [RAG] No context found, using original genres: {original_genres}")
# Use RAG-enhanced genre selection
selected_genre = random.choice(genres) if genres else 'pop'
if selected_genre not in original_genres:
print(f"🎵 [RAG] Using RAG-suggested genre: {selected_genre} (was: {original_genres})")
else:
print(f"🎵 [RAG] Using genre: {selected_genre} (from preferences)")
# Update preferences with RAG-suggested genre
updated_prefs = prefs.copy()
if selected_genre not in updated_prefs.get('favorite_genres', []):
updated_prefs['favorite_genres'] = updated_prefs.get('favorite_genres', []) + [selected_genre]
# Use _plan_music_segment which handles RAG preferences and duplicate checking
music_segment = self._plan_music_segment(updated_prefs, user_id=user_id)
segment["track"] = music_segment.get("track")
track = segment.get("track")
if track and not segment.get("commentary"):
# Use stored user_id if available
user_id = prefs.get('_user_id') or self.current_user_id
# Use RAG context for more personalized commentary
rag_query = f"User's music listening history and preferences for {track.get('title', 'this track')} by {track.get('artist', 'this artist')}"
print(f"\n🎤 [RAG] Querying for commentary context for user {user_id}: {track.get('title')} by {track.get('artist')}")
rag_context = self.rag_system.query_user_context(rag_query, user_id=user_id, top_k=2)
if rag_context:
print(f"🎤 [RAG] Using {len(rag_context)} context documents for personalized commentary")
for i, ctx in enumerate(rag_context):
score = ctx.get('score', 0)
preview = ctx.get('text', '')[:80].replace('\n', ' ')
print(f" 📄 Context #{i+1} (score: {score:.4f}): {preview}...")
else:
print(f" ℹ️ [RAG] No context found, generating standard commentary")
segment["commentary"] = self._generate_music_commentary(track, prefs, rag_context=rag_context)
elif seg_type == "news":
# Lazy fetch news if not yet fetched
if not segment.get("news_items"):
interests = prefs.get('interests', ['world'])
category = random.choice(interests)
segment["news_items"] = self.news_server.fetch_news(category=category, limit=3)
news_items = segment.get("news_items", [])
if news_items and not segment.get("script_batches"):
script_batches = self._generate_news_script(news_items, prefs)
segment["script_batches"] = script_batches
segment["script"] = script_batches
elif seg_type == "podcast":
# Lazy fetch podcast if not yet fetched
if not segment.get("podcast"):
interests = prefs.get('podcast_interests', ['technology'])
category = random.choice(interests)
podcasts = self.podcast_server.get_trending_podcasts(category=category, limit=1)
segment["podcast"] = podcasts[0] if podcasts else None
podcast = segment.get("podcast")
if podcast and not segment.get("intro"):
segment["intro"] = self._generate_podcast_intro(podcast, prefs)
elif seg_type == "story":
if not segment.get("content"):
mood = prefs.get("mood", "happy")
interests = prefs.get("interests", ["technology"])
segment["content"] = self._generate_story(mood, interests, prefs)
return segment
def get_all_mcp_tools(self) -> List[Dict[str, Any]]:
"""Get all available MCP tools from servers"""
tools = []
tools.extend(self.music_server.get_tools_definition())
tools.extend(self.news_server.get_tools_definition())
tools.extend(self.podcast_server.get_tools_definition())
return tools