Spaces:
Sleeping
Sleeping
| """AI Radio Agent with Autonomous Behavior""" | |
| import json | |
| import random | |
| import logging | |
| import os | |
| from typing import Dict, Any, List, Generator | |
| from datetime import datetime | |
| from openai import OpenAI | |
| from mcp_servers.music_server import MusicMCPServer | |
| from mcp_servers.news_server import NewsMCPServer | |
| from mcp_servers.podcast_server import PodcastMCPServer | |
| from rag_system import RadioRAGSystem | |
| from user_memory import UserMemoryService | |
| # ----------------------------------------------------------------------------- | |
| # LLM Logging Setup | |
| # ----------------------------------------------------------------------------- | |
| # Get project root directory (parent of src/) | |
| PROJECT_ROOT = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) | |
| LOG_DIR = os.path.join(PROJECT_ROOT, "logs") | |
| os.makedirs(LOG_DIR, exist_ok=True) | |
| llm_logger = logging.getLogger("ai_radio.llm") | |
| if not llm_logger.handlers: | |
| llm_logger.setLevel(logging.INFO) | |
| _fh = logging.FileHandler(os.path.join(LOG_DIR, "llm_answers.log"), encoding="utf-8") | |
| _fmt = logging.Formatter("%(asctime)s [%(levelname)s] %(message)s") | |
| _fh.setFormatter(_fmt) | |
| llm_logger.addHandler(_fh) | |
| class RadioAgent: | |
| """Autonomous AI Radio Agent with planning, reasoning, and execution""" | |
| def __init__(self, config): | |
| self.config = config | |
| # Initialize Nebius/OpenAI LLM (GPT-OSS-120B) with error handling | |
| self.client = None | |
| if config.nebius_api_key: | |
| try: | |
| self.client = OpenAI( | |
| api_key=config.nebius_api_key, | |
| base_url=config.nebius_api_base | |
| ) | |
| # Test the connection | |
| self.client.models.list() | |
| except Exception as e: | |
| print(f"Warning: Could not initialize Nebius/OpenAI LLM: {e}") | |
| print("Radio agent will work in fallback mode with limited LLM features") | |
| self.client = None | |
| # Initialize MCP Servers | |
| self.music_server = MusicMCPServer() | |
| self.news_server = NewsMCPServer() | |
| self.podcast_server = PodcastMCPServer() | |
| # Initialize RAG System (using Nebius API key) | |
| self.rag_system = RadioRAGSystem(config.nebius_api_key, config.nebius_api_base, config.nebius_model) | |
| # Agent state | |
| self.is_streaming = False | |
| self.current_segment = None | |
| self.segment_history = [] | |
| self.current_user_id = None # Track current user for RAG queries | |
| def plan_radio_show(self, user_preferences: Dict[str, Any], duration_minutes: int = 30, | |
| content_filter: Dict[str, bool] = None, user_id: str = None) -> List[Dict[str, Any]]: | |
| """ | |
| Plan a personalized radio show based on user preferences | |
| This demonstrates autonomous planning behavior | |
| Args: | |
| user_preferences: User's preferences and mood | |
| duration_minutes: Total duration of the show | |
| content_filter: Dictionary with content type filters (music, news, podcasts, stories) | |
| user_id: User ID for RAG queries | |
| Returns: | |
| List of planned segments | |
| """ | |
| # Store user_id for later use | |
| self.current_user_id = user_id | |
| segments = [] | |
| # Default filter - all enabled | |
| if content_filter is None: | |
| content_filter = { | |
| "music": True, | |
| "news": True, | |
| "podcasts": True, | |
| "stories": True | |
| } | |
| # Get user preferences from RAG (filtered by user_id) | |
| stored_prefs = self.rag_system.get_user_preferences(user_id=user_id) | |
| merged_prefs = {**stored_prefs, **user_preferences} | |
| # Calculate segment distribution based on filter | |
| total_segments = max(5, duration_minutes // 5) | |
| music_count = int(total_segments * self.config.music_ratio) if content_filter.get("music", True) else 0 | |
| news_count = int(total_segments * self.config.news_ratio) if content_filter.get("news", True) else 0 | |
| podcast_count = int(total_segments * self.config.podcast_ratio) if content_filter.get("podcasts", True) else 0 | |
| story_count = max(1, int(total_segments * self.config.story_ratio)) if content_filter.get("stories", True) else 0 | |
| # Balance to total | |
| total_planned = music_count + news_count + podcast_count + story_count | |
| if total_planned == 0: | |
| # If all filtered out, enable music as default | |
| music_count = total_segments | |
| else: | |
| remaining = total_segments - total_planned | |
| # Distribute remaining to enabled types | |
| enabled_types = sum([content_filter.get("music", True), content_filter.get("news", True), | |
| content_filter.get("podcasts", True), content_filter.get("stories", True)]) | |
| if enabled_types > 0: | |
| per_type = remaining // enabled_types | |
| if content_filter.get("music", True): | |
| music_count += per_type | |
| if content_filter.get("news", True): | |
| news_count += per_type | |
| if content_filter.get("podcasts", True): | |
| podcast_count += per_type | |
| if content_filter.get("stories", True): | |
| story_count += per_type | |
| # Create segment plan | |
| segment_types = [] | |
| if content_filter.get("music", True): | |
| segment_types.extend(['music'] * music_count) | |
| if content_filter.get("news", True): | |
| segment_types.extend(['news'] * news_count) | |
| if content_filter.get("podcasts", True): | |
| segment_types.extend(['podcast'] * podcast_count) | |
| if content_filter.get("stories", True): | |
| segment_types.extend(['story'] * story_count) | |
| # Shuffle for variety | |
| random.shuffle(segment_types) | |
| # Ensure we start with an introduction | |
| segments.append({ | |
| 'type': 'intro', | |
| 'content': None, # Will be generated lazily when segment is played | |
| 'duration': 1 | |
| }) | |
| # Generate each segment as a SKELETON (no YouTube search, no LLM yet) | |
| # Actual content will be fetched lazily when the segment is played | |
| for seg_type in segment_types: | |
| if seg_type == 'music': | |
| # Skeleton: just record the type and preferences; track will be fetched lazily | |
| segments.append({ | |
| 'type': 'music', | |
| 'track': None, # Will be fetched when segment is played | |
| 'commentary': None, | |
| 'duration': 3, | |
| '_prefs': merged_prefs # Store prefs for lazy fetch | |
| }) | |
| elif seg_type == 'news': | |
| # Skeleton: just record the type; news will be fetched lazily | |
| segments.append({ | |
| 'type': 'news', | |
| 'news_items': None, | |
| 'script': None, | |
| 'script_batches': None, | |
| 'duration': 2, | |
| '_prefs': merged_prefs | |
| }) | |
| elif seg_type == 'podcast': | |
| # Skeleton: just record the type; podcast will be fetched lazily | |
| segments.append({ | |
| 'type': 'podcast', | |
| 'podcast': None, | |
| 'intro': None, | |
| 'duration': 2, | |
| '_prefs': merged_prefs | |
| }) | |
| elif seg_type == 'story': | |
| # Skeleton: just record the type; story will be generated lazily | |
| segments.append({ | |
| 'type': 'story', | |
| 'content': None, | |
| 'duration': 2, | |
| '_prefs': merged_prefs | |
| }) | |
| # Add outro | |
| segments.append({ | |
| 'type': 'outro', | |
| 'content': None, # Will be generated lazily when segment is played | |
| 'duration': 1 | |
| }) | |
| return segments | |
| def _generate_intro(self, preferences: Dict[str, Any]) -> str: | |
| """Generate personalized radio intro - longer and more engaging""" | |
| mood = preferences.get('mood', 'happy') | |
| name = preferences.get('name', 'friend') | |
| time_of_day = self._get_time_of_day() | |
| interests = preferences.get('interests', ['technology']) | |
| # Get voice name from preferences (remove gender suffix) | |
| voice_name = preferences.get('voice_name', 'Rachel (Female)') | |
| host_name = voice_name.split(' (')[0] if ' (' in voice_name else voice_name | |
| if self.client: | |
| try: | |
| prompt = f"""You are a charismatic, entertaining radio host named {host_name}, you work on AI RADIO. Create a warm, engaging {time_of_day} greeting | |
| for {name}. The listener is feeling {mood} and interested in {', '.join(interests[:2])}. | |
| Make it: | |
| - 2-3 sentences long (about 10 seconds of speech, max_tokens = 200) | |
| - Energetic and personal | |
| - Include a fun fact or light joke related to their interests | |
| - Make them excited to listen! | |
| - Sound natural and conversational, like a real radio host""" | |
| response = self.client.chat.completions.create( | |
| model=self.config.nebius_model, | |
| messages=[{"role": "user", "content": prompt}], | |
| temperature=0.9, | |
| max_tokens=200 #200 | |
| ) | |
| content = response.choices[0].message.content | |
| if not content: | |
| return "Welcome to AI Radio! Let's get started with some great music." | |
| text = content.strip() | |
| llm_logger.info( | |
| "INTRO | model=%s | prompt=%r | response=%r", | |
| self.config.nebius_model, | |
| prompt, | |
| text[:1000], # truncate to avoid huge logs | |
| ) | |
| return text | |
| except Exception as e: | |
| print(f"Error generating intro: {e}") | |
| # Fallback intro | |
| return f"Good {time_of_day}, {name}! Welcome to your personal AI Radio station. We've got an amazing show lined up for you today! Did you know that music can actually boost your productivity? That's right! So sit back, relax, and let's get this party started!" | |
| def _generate_outro(self, preferences: Dict[str, Any]) -> str: | |
| """Generate personalized radio outro""" | |
| name = preferences.get('name', 'friend') | |
| if self.client: | |
| try: | |
| prompt = f"""You are a charismatic radio host wrapping up a show. | |
| Create a warm, friendly goodbye message for {name}. | |
| Thank them for listening and invite them back. Keep it 2-3 sentences. max_tokens = 200""" | |
| response = self.client.chat.completions.create( | |
| model=self.config.nebius_model, | |
| messages=[{"role": "user", "content": prompt}], | |
| temperature=0.9, | |
| max_tokens=200 #200 | |
| ) | |
| content = response.choices[0].message.content | |
| if not content: | |
| return "Welcome to AI Radio! Let's get started with some great music." | |
| text = content.strip() | |
| llm_logger.info( | |
| "OUTRO | model=%s | prompt=%r | response=%r", | |
| self.config.nebius_model, | |
| prompt, | |
| text[:1000], | |
| ) | |
| return text | |
| except Exception as e: | |
| print(f"Error generating outro: {e}") | |
| return f"That's all for now, {name}! Thanks for tuning in to AI Radio. Come back soon for more personalized content!" | |
| def _plan_music_segment(self, preferences: Dict[str, Any], user_id: str = None) -> Dict[str, Any]: | |
| """Plan a music segment using Music MCP Server with RAG preferences""" | |
| # Get user preferences from RAG (more accurate than just preferences dict) | |
| rag_prefs = {} | |
| if user_id and self.rag_system: | |
| try: | |
| rag_prefs = self.rag_system.get_user_preferences(user_id) | |
| print(f"🎵 [RAG] Retrieved preferences for user {user_id}: genres={rag_prefs.get('favorite_genres', [])}") | |
| except Exception as e: | |
| print(f"⚠️ [RAG] Could not get preferences: {e}") | |
| # Use RAG preferences if available, otherwise fall back to provided preferences | |
| genres = rag_prefs.get('favorite_genres') or preferences.get('favorite_genres', ['pop']) | |
| mood = rag_prefs.get('mood') or preferences.get('mood', 'happy') | |
| # Get recently played tracks from user memory to avoid duplicates | |
| played_video_ids = set() | |
| if user_id: | |
| try: | |
| user_memory = UserMemoryService() | |
| play_history = user_memory.get_play_history(user_id) | |
| # Extract YouTube IDs from history | |
| for entry in play_history: | |
| if entry.get('youtube_id'): | |
| played_video_ids.add(entry['youtube_id']) | |
| elif entry.get('url') and 'youtube.com' in entry.get('url', ''): | |
| # Extract ID from URL | |
| url = entry['url'] | |
| if 'v=' in url: | |
| vid_id = url.split('v=')[-1].split('&')[0] | |
| played_video_ids.add(vid_id) | |
| print(f"🎵 [RAG] Found {len(played_video_ids)} previously played tracks to avoid") | |
| except Exception as e: | |
| print(f"⚠️ Could not get play history: {e}") | |
| # Search for music, filtering out played tracks | |
| max_attempts = 5 | |
| track = None | |
| for attempt in range(max_attempts): | |
| # Use MCP server to get music | |
| tracks = self.music_server.search_free_music( | |
| genre=random.choice(genres) if genres else 'pop', | |
| mood=mood, | |
| limit=10 # Get more results to filter from | |
| ) | |
| # Filter out already played tracks | |
| available_tracks = [ | |
| t for t in tracks | |
| if t.get('youtube_id') and t['youtube_id'] not in played_video_ids | |
| ] | |
| if available_tracks: | |
| track = random.choice(available_tracks) | |
| # Mark as recently played in music server to avoid duplicates | |
| if track.get('youtube_id'): | |
| self.music_server._add_to_recently_played(track['youtube_id']) | |
| print(f"✅ [RAG] Selected new track: {track.get('title', 'Unknown')} (not in history)") | |
| break | |
| else: | |
| print(f"⚠️ [RAG] All tracks already played, trying different genre...") | |
| # Try a different genre if all tracks were played | |
| if attempt < max_attempts - 1: | |
| genres = [g for g in genres if g != random.choice(genres)] or ['pop'] | |
| # If still no track, use any available (fallback) | |
| if not track and tracks: | |
| track = tracks[0] | |
| print(f"⚠️ Using track despite possible duplicate: {track.get('title', 'Unknown')}") | |
| return { | |
| 'type': 'music', | |
| 'track': track, | |
| 'commentary': None, # Will be generated lazily when segment is played | |
| 'duration': 3 | |
| } | |
| def _plan_news_segment(self, preferences: Dict[str, Any]) -> Dict[str, Any]: | |
| """Plan a news segment using News MCP Server (LLM script generated later)""" | |
| interests = preferences.get('interests', ['world']) | |
| category = random.choice(interests) | |
| # Use MCP server to get news | |
| news_items = self.news_server.fetch_news(category=category, limit=3) | |
| return { | |
| 'type': 'news', | |
| 'news_items': news_items, | |
| 'script': None, | |
| 'script_batches': None, # Will be filled lazily | |
| 'duration': 2 # Will be updated based on actual length | |
| } | |
| def _plan_podcast_segment(self, preferences: Dict[str, Any]) -> Dict[str, Any]: | |
| """Plan a podcast segment using Podcast MCP Server (LLM intro later)""" | |
| interests = preferences.get('podcast_interests', ['technology']) | |
| category = random.choice(interests) | |
| # Use MCP server to get podcasts | |
| podcasts = self.podcast_server.get_trending_podcasts(category=category, limit=1) | |
| podcast = podcasts[0] if podcasts else None | |
| return { | |
| 'type': 'podcast', | |
| 'podcast': podcast, | |
| 'intro': None, # Will be generated lazily when segment is played | |
| 'duration': 2 | |
| } | |
| def _plan_story_segment(self, preferences: Dict[str, Any]) -> Dict[str, Any]: | |
| """Plan a story/fun fact segment (LLM later)""" | |
| return { | |
| 'type': 'story', | |
| 'content': None, # Will be generated lazily when segment is played | |
| 'duration': 2 | |
| } | |
| def _generate_music_commentary(self, track: Dict[str, Any], preferences: Dict[str, Any], rag_context: List[Dict[str, Any]] = None) -> str: | |
| """Generate longer, more engaging commentary for music track with jokes/facts""" | |
| if not track or not self.client: | |
| return f"Here's a great track for you!" | |
| mood = preferences.get('mood', 'happy') | |
| interests = preferences.get('interests', []) | |
| # Add RAG context to prompt if available | |
| rag_context_text = "" | |
| if rag_context: | |
| rag_context_text = "\n\nUser's listening history context (use this to personalize your commentary):\n" | |
| for i, ctx in enumerate(rag_context[:2]): # Use top 2 most relevant | |
| ctx_text = ctx.get('text', '')[:200] | |
| score = ctx.get('score', 0) | |
| rag_context_text += f"- Context #{i+1} (relevance: {score:.4f}): {ctx_text}\n" | |
| llm_logger.info( | |
| "RAG_CONTEXT | track=%s | context_docs=%d | context_preview=%s", | |
| track.get('title', 'Unknown'), | |
| len(rag_context), | |
| rag_context_text[:300] | |
| ) | |
| print(f" 📝 [RAG] Added {len(rag_context)} context documents to LLM prompt") | |
| else: | |
| print(f" ℹ️ [RAG] No context available - using standard prompt") | |
| # Get voice name from preferences (remove gender suffix) | |
| voice_name = preferences.get('voice_name', 'Rachel (Female)') | |
| host_name = voice_name.split(' (')[0] if ' (' in voice_name else voice_name | |
| try: | |
| prompt = f"""You are an energetic, entertaining radio DJ named {host_name}. This is a middle of a show. Introduce this song in a fun, engaging way: | |
| Title: {track['title']} | |
| Artist: {track['artist']} | |
| Genre: {track['genre']} | |
| Listener mood: {mood}{rag_context_text} | |
| Make it: | |
| - About 10-15 seconds of speech, max_tokens = 400 | |
| - Include a fun fact about the genre, artist, or music in general | |
| - Add a light joke or witty comment | |
| - Be enthusiastic and engaging | |
| - Sound natural, like a real radio host | |
| - Connect it to the listener's mood if possible | |
| - Reference their listening history if relevant (from context above)""" | |
| if rag_context: | |
| print(f" ✅ [RAG] LLM prompt includes RAG context - model will use personalized history") | |
| response = self.client.chat.completions.create( | |
| model=self.config.nebius_model, | |
| messages=[{"role": "user", "content": prompt}], | |
| temperature=0.9, | |
| max_tokens=400 #400 | |
| ) | |
| content = response.choices[0].message.content | |
| if not content: | |
| return f"Coming up: {track['title']} by {track['artist']}! This is a fantastic {track['genre']} track that's perfect for your {mood} mood. Let's enjoy this one!" | |
| text = content.strip() | |
| # Check if response references RAG context | |
| rag_used = False | |
| if rag_context: | |
| # Check if response mentions anything from context | |
| text_lower = text.lower() | |
| for ctx in rag_context: | |
| ctx_text = ctx.get('text', '').lower() | |
| # Look for key terms from context in response | |
| key_terms = ['listened', 'enjoyed', 'prefer', 'favorite', 'history', 'before', 'previous', 'similar'] | |
| if any(term in ctx_text and term in text_lower for term in key_terms): | |
| rag_used = True | |
| break | |
| if rag_used: | |
| print(f" ✅ [RAG] LLM response appears to reference RAG context!") | |
| llm_logger.info(f"✅ RAG CONTEXT USED in commentary for {track.get('title')}") | |
| else: | |
| print(f" ℹ️ [RAG] LLM response doesn't explicitly reference context (may still be influenced)") | |
| llm_logger.info( | |
| "MUSIC | model=%s | rag_used=%s | prompt_length=%d | response=%r", | |
| self.config.nebius_model, | |
| rag_used, | |
| len(prompt), | |
| text[:1000], | |
| ) | |
| return text | |
| except Exception as e: | |
| return f"Coming up: {track['title']} by {track['artist']}! This is a fantastic {track['genre']} track that's perfect for your {mood} mood. Let's enjoy this one!" | |
| def _generate_news_script(self, news_items: List[Dict[str, Any]], preferences: Dict[str, Any]) -> List[str]: | |
| """Generate news segment script in batches (1-2 minutes total, ~200-250 words)""" | |
| if not news_items: | |
| return ["That's all for news. Back to music!"] | |
| if self.client: | |
| try: | |
| # Generate full news script (1-2 minutes of speech) | |
| # Check if news items have required fields | |
| valid_items = [] | |
| for item in news_items[:3]: | |
| if item.get('title') and item.get('summary'): | |
| valid_items.append(item) | |
| if not valid_items: | |
| print("⚠️ No valid news items with title and summary, using fallback") | |
| raise ValueError("No valid news items") | |
| news_text = "\n".join([f"- {item['title']}: {item['summary']}" for item in valid_items]) | |
| print(f"📰 Generating news script from {len(valid_items)} items") | |
| # Get voice name from preferences (remove gender suffix) | |
| voice_name = preferences.get('voice_name', 'Rachel (Female)') | |
| host_name = voice_name.split(' (')[0] if ' (' in voice_name else voice_name | |
| prompt = f"""You are a professional radio news presenter named {host_name}. Say that this is news time on the show. | |
| Present these news items in a conversational, engaging way. Text includes only what {host_name} needs to say. This should be about 1 minute of speech (about 150-200 words): | |
| News items: | |
| {news_text} | |
| Requirements: | |
| - Start by saying "It's news time on the show" | |
| - Present each story in a conversational, engaging way | |
| - Provide brief context for each story | |
| - Use smooth transitions between stories | |
| - Be informative but friendly and conversational | |
| - Sound natural, like a real radio news anchor | |
| - Keep it concise - about 150-200 words total""" | |
| print(f"📝 News prompt length: {len(prompt)} chars") | |
| response = self.client.chat.completions.create( | |
| model=self.config.nebius_model, | |
| messages=[{"role": "user", "content": prompt}], | |
| temperature=0.9, | |
| max_tokens=400 # Increased from 100 to allow proper response | |
| ) | |
| # Check response structure | |
| if not response or not response.choices: | |
| print("⚠️ LLM returned empty response object") | |
| raise ValueError("Empty response object from LLM") | |
| if not response.choices[0].message: | |
| print("⚠️ LLM response has no message") | |
| raise ValueError("No message in LLM response") | |
| content = response.choices[0].message.content | |
| if not content: | |
| print("⚠️ LLM returned empty content for news script, using fallback") | |
| print(f" Response object: {response}") | |
| raise ValueError("Empty content from LLM") | |
| full_script = content.strip() | |
| print(f"✅ Generated news script: {len(full_script)} chars, {len(full_script.split())} words") | |
| llm_logger.info( | |
| "NEWS | model=%s | prompt=%r | response=%r", | |
| self.config.nebius_model, | |
| prompt, | |
| full_script[:1500], | |
| ) | |
| # Split into batches (approximately 50-60 words each for smooth playback) | |
| # This allows streaming without waiting for full generation | |
| sentences = full_script.split('. ') | |
| batches = [] | |
| current_batch = "" | |
| for sentence in sentences: | |
| if len(current_batch.split()) < 50: | |
| current_batch += sentence + ". " | |
| else: | |
| batches.append(current_batch.strip()) | |
| current_batch = sentence + ". " | |
| if current_batch: | |
| batches.append(current_batch.strip()) | |
| return batches if batches else [full_script] | |
| except Exception as e: | |
| print(f"❌ Error generating news script: {e}") | |
| import traceback | |
| traceback.print_exc() | |
| # Fallback - create a simple script from available news items | |
| print("📰 Using fallback news script") | |
| script = "It's news time on the show. " | |
| valid_items = [item for item in news_items[:2] if item.get('title')] | |
| if valid_items: | |
| for item in valid_items: | |
| title = item.get('title', 'News story') | |
| script += f"{title}. " | |
| script += "That's the news for now. Back to music!" | |
| else: | |
| script = "That's all for news. Back to music!" | |
| return [script] | |
| def _generate_podcast_intro(self, podcast: Dict[str, Any], preferences: Dict[str, Any]) -> str: | |
| """Generate podcast introduction""" | |
| if not podcast or not self.client: | |
| return "Time for an interesting podcast!" | |
| # Get voice name from preferences (remove gender suffix) | |
| voice_name = preferences.get('voice_name', 'Rachel (Female)') | |
| host_name = voice_name.split(' (')[0] if ' (' in voice_name else voice_name | |
| try: | |
| prompt = f"""You are a radio host {host_name} introducing a podcast. This is a middle of the show. Create a brief, engaging intro (2-3 sentences, max_tokens = 200): | |
| Podcast: {podcast['title']} | |
| Host: {podcast['host']} | |
| Description: {podcast['description']} | |
| Make listeners want to check it out!""" | |
| response = self.client.chat.completions.create( | |
| model=self.config.nebius_model, | |
| messages=[{"role": "user", "content": prompt}], | |
| temperature=0.8, | |
| max_tokens=200 #200 | |
| ) | |
| content = response.choices[0].message.content | |
| if not content: | |
| return f"Here's an interesting podcast: {podcast.get('title', 'Unknown')}. Let's listen!" | |
| text = content.strip() | |
| llm_logger.info( | |
| "PODCAST | model=%s | prompt=%r | response=%r", | |
| self.config.nebius_model, | |
| prompt, | |
| text[:1000], | |
| ) | |
| return text | |
| except Exception as e: | |
| return f"Check out {podcast['title']} hosted by {podcast['host']}!" | |
| def _generate_story(self, mood: str, interests: List[str], preferences: Dict[str, Any] = None) -> str: | |
| """Generate an interesting story or fun fact""" | |
| if not self.client: | |
| return "Here's a fun fact: Music can boost your mood and productivity!" | |
| # Get voice name from preferences (remove gender suffix) | |
| if preferences: | |
| voice_name = preferences.get('voice_name', 'Rachel (Female)') | |
| host_name = voice_name.split(' (')[0] if ' (' in voice_name else voice_name | |
| else: | |
| host_name = "Rachel" # Default fallback | |
| try: | |
| interest = random.choice(interests) if interests else "general knowledge" | |
| prompt = f"""You are a radio host named {host_name}. Share a fascinating, {mood} story or fun fact about {interest}. | |
| Keep it engaging and under 100-200 words. max_tokens = 600""" | |
| response = self.client.chat.completions.create( | |
| model=self.config.nebius_model, | |
| messages=[{"role": "user", "content": prompt}], | |
| temperature=0.9, | |
| max_tokens=600 #600 | |
| ) | |
| content = response.choices[0].message.content | |
| if not content: | |
| return "Here's a fascinating story for you. Let me share it with you." | |
| text = content.strip() | |
| llm_logger.info( | |
| "STORY | model=%s | prompt=%r | response=%r", | |
| self.config.nebius_model, | |
| prompt, | |
| text[:1000], | |
| ) | |
| return text | |
| except Exception as e: | |
| return "Here's something interesting: The world's oldest radio station has been broadcasting since 1920!" | |
| def generate_song_request_response(self, user_request: str, track: Dict[str, Any], preferences: Dict[str, Any] = None) -> str: | |
| """Generate a short, engaging host response for a voice song request""" | |
| title = track.get('title', 'this track') | |
| artist = track.get('artist', 'Unknown Artist') | |
| genre = track.get('genre', '') | |
| # Get voice name from preferences (remove gender suffix) | |
| if preferences: | |
| voice_name = preferences.get('voice_name', 'Rachel (Female)') | |
| host_name = voice_name.split(' (')[0] if ' (' in voice_name else voice_name | |
| else: | |
| host_name = "Rachel" # Default fallback | |
| if not self.client: | |
| return f"Great choice! Here's '{title}' by {artist} coming right up!" | |
| try: | |
| prompt = f"""You are an upbeat radio DJ named {host_name}. A listener just asked: "{user_request}" | |
| You found the song "{title}" by {artist}{f' ({genre})' if genre else ''}. | |
| Write a SHORT, enthusiastic 1-2 sentence response (max 200 tokens) introducing the song. | |
| Be natural and fun - you can add a quick fact or joke if it fits. | |
| Don't use emojis. Just speak naturally like a real DJ.""" | |
| response = self.client.chat.completions.create( | |
| model=self.config.nebius_model, | |
| messages=[{"role": "user", "content": prompt}], | |
| temperature=0.9, | |
| max_tokens=200 #200 # Keep it short for fast response | |
| ) | |
| content = response.choices[0].message.content | |
| if not content: | |
| return f"Great choice! Here's '{title}' by {artist} coming right up!" | |
| text = content.strip() | |
| llm_logger.info( | |
| "SONG_REQUEST | model=%s | prompt=%r | response=%r", | |
| self.config.nebius_model, | |
| prompt, | |
| text[:500], | |
| ) | |
| return text | |
| except Exception as e: | |
| print(f"Error generating song request response: {e}") | |
| return f"You got it! Here's '{title}' by {artist}!" | |
| def _get_time_of_day(self) -> str: | |
| """Get appropriate greeting based on time""" | |
| hour = datetime.now().hour | |
| if hour < 12: | |
| return "morning" | |
| elif hour < 18: | |
| return "afternoon" | |
| else: | |
| return "evening" | |
| def execute_segment(self, segment: Dict[str, Any], user_id: str = None) -> Dict[str, Any]: | |
| """Execute a planned segment and log to RAG""" | |
| self.current_segment = segment | |
| self.segment_history.append(segment) | |
| # Use stored user_id if not provided | |
| if not user_id: | |
| user_id = self.current_user_id | |
| # Store in RAG system with user_id | |
| self.rag_system.store_listening_history( | |
| item_type=segment['type'], | |
| item_data=segment, | |
| user_id=user_id | |
| ) | |
| return segment | |
| # ------------------------------------------------------------------ | |
| # Lazy LLM enrichment for segments | |
| # ------------------------------------------------------------------ | |
| def enrich_segment(self, segment: Dict[str, Any], preferences: Dict[str, Any]) -> Dict[str, Any]: | |
| """ | |
| Ensure the given segment has its LLM-generated content populated. | |
| Called just-in-time when the segment is about to be played. | |
| For music/news/podcast segments, this also fetches the actual track/news/podcast | |
| if they haven't been fetched yet (lazy loading). | |
| """ | |
| seg_type = segment.get("type") | |
| # Use stored prefs if available (from skeleton), otherwise use passed prefs | |
| prefs = segment.get("_prefs", preferences) | |
| if seg_type == "intro": | |
| if not segment.get("content"): | |
| segment["content"] = self._generate_intro(prefs) | |
| elif seg_type == "outro": | |
| if not segment.get("content"): | |
| segment["content"] = self._generate_outro(prefs) | |
| elif seg_type == "music": | |
| # Lazy fetch track if not yet fetched | |
| if not segment.get("track"): | |
| # Use RAG to get personalized recommendations | |
| genres = prefs.get('favorite_genres', ['pop']) | |
| mood = prefs.get('mood', 'happy') | |
| # Use stored user_id if available | |
| user_id = prefs.get('_user_id') or self.current_user_id | |
| # Query RAG for music preferences based on listening history | |
| rag_query = f"What music genres and styles does the user prefer? Favorite genres: {', '.join(genres)}, Mood: {mood}" | |
| print(f"\n🎵 [RAG] Querying for genre recommendations for user {user_id}...") | |
| rag_context = self.rag_system.query_user_context(rag_query, user_id=user_id, top_k=3) | |
| original_genres = genres.copy() | |
| # Extract genre suggestions from RAG if available | |
| if rag_context: | |
| print(f"🎵 [RAG] Analyzing {len(rag_context)} context documents for genre suggestions...") | |
| for i, ctx in enumerate(rag_context): | |
| ctx_text = ctx.get('text', '').lower() | |
| score = ctx.get('score', 0) | |
| # Look for genre mentions in context | |
| for genre in ['pop', 'rock', 'jazz', 'classical', 'electronic', 'hip-hop', 'country', 'indie', 'rap', 'blues', 'folk', 'metal', 'reggae', 'soul', 'r&b']: | |
| if genre in ctx_text and genre not in genres: | |
| genres.append(genre) | |
| print(f" ✅ [RAG] Suggested genre from context #{i+1} (score: {score:.4f}): {genre}") | |
| else: | |
| print(f" ℹ️ [RAG] No context found, using original genres: {original_genres}") | |
| # Use RAG-enhanced genre selection | |
| selected_genre = random.choice(genres) if genres else 'pop' | |
| if selected_genre not in original_genres: | |
| print(f"🎵 [RAG] Using RAG-suggested genre: {selected_genre} (was: {original_genres})") | |
| else: | |
| print(f"🎵 [RAG] Using genre: {selected_genre} (from preferences)") | |
| # Update preferences with RAG-suggested genre | |
| updated_prefs = prefs.copy() | |
| if selected_genre not in updated_prefs.get('favorite_genres', []): | |
| updated_prefs['favorite_genres'] = updated_prefs.get('favorite_genres', []) + [selected_genre] | |
| # Use _plan_music_segment which handles RAG preferences and duplicate checking | |
| music_segment = self._plan_music_segment(updated_prefs, user_id=user_id) | |
| segment["track"] = music_segment.get("track") | |
| track = segment.get("track") | |
| if track and not segment.get("commentary"): | |
| # Use stored user_id if available | |
| user_id = prefs.get('_user_id') or self.current_user_id | |
| # Use RAG context for more personalized commentary | |
| rag_query = f"User's music listening history and preferences for {track.get('title', 'this track')} by {track.get('artist', 'this artist')}" | |
| print(f"\n🎤 [RAG] Querying for commentary context for user {user_id}: {track.get('title')} by {track.get('artist')}") | |
| rag_context = self.rag_system.query_user_context(rag_query, user_id=user_id, top_k=2) | |
| if rag_context: | |
| print(f"🎤 [RAG] Using {len(rag_context)} context documents for personalized commentary") | |
| for i, ctx in enumerate(rag_context): | |
| score = ctx.get('score', 0) | |
| preview = ctx.get('text', '')[:80].replace('\n', ' ') | |
| print(f" 📄 Context #{i+1} (score: {score:.4f}): {preview}...") | |
| else: | |
| print(f" ℹ️ [RAG] No context found, generating standard commentary") | |
| segment["commentary"] = self._generate_music_commentary(track, prefs, rag_context=rag_context) | |
| elif seg_type == "news": | |
| # Lazy fetch news if not yet fetched | |
| if not segment.get("news_items"): | |
| interests = prefs.get('interests', ['world']) | |
| category = random.choice(interests) | |
| segment["news_items"] = self.news_server.fetch_news(category=category, limit=3) | |
| news_items = segment.get("news_items", []) | |
| if news_items and not segment.get("script_batches"): | |
| script_batches = self._generate_news_script(news_items, prefs) | |
| segment["script_batches"] = script_batches | |
| segment["script"] = script_batches | |
| elif seg_type == "podcast": | |
| # Lazy fetch podcast if not yet fetched | |
| if not segment.get("podcast"): | |
| interests = prefs.get('podcast_interests', ['technology']) | |
| category = random.choice(interests) | |
| podcasts = self.podcast_server.get_trending_podcasts(category=category, limit=1) | |
| segment["podcast"] = podcasts[0] if podcasts else None | |
| podcast = segment.get("podcast") | |
| if podcast and not segment.get("intro"): | |
| segment["intro"] = self._generate_podcast_intro(podcast, prefs) | |
| elif seg_type == "story": | |
| if not segment.get("content"): | |
| mood = prefs.get("mood", "happy") | |
| interests = prefs.get("interests", ["technology"]) | |
| segment["content"] = self._generate_story(mood, interests, prefs) | |
| return segment | |
| def get_all_mcp_tools(self) -> List[Dict[str, Any]]: | |
| """Get all available MCP tools from servers""" | |
| tools = [] | |
| tools.extend(self.music_server.get_tools_definition()) | |
| tools.extend(self.news_server.get_tools_definition()) | |
| tools.extend(self.podcast_server.get_tools_definition()) | |
| return tools | |