"""MCP Server for Music Recommendations""" import json import requests import yt_dlp import os import random import socket import time as time_module from typing import List, Dict, Any, Optional from dataclasses import dataclass @dataclass class Track: """Music track information""" title: str artist: str url: str duration: int genre: str class MusicMCPServer: """MCP Server for music recommendations and playback""" def __init__(self): self.name = "music_server" self.description = "Provides music recommendations and free music tracks" self.cache_dir = "music_cache" os.makedirs(self.cache_dir, exist_ok=True) # Cache for embed check results to avoid repeated API calls self._embed_cache = {} # Rate limiting for YouTube API (prevent blocking) self._last_youtube_call = 0 self._min_call_interval = 3.0 # Minimum 3 seconds between YouTube calls # Track recently played to avoid repeats self._recently_played = [] # List of video IDs self._max_recent = 20 # Remember last 20 tracks def _check_youtube_available(self) -> bool: """Check if YouTube is accessible via DNS""" try: socket.gethostbyname('www.youtube.com') return True except socket.gaierror: return False def _rate_limit_youtube(self): """Enforce rate limiting for YouTube API calls""" import time as time_module current_time = time_module.time() elapsed = current_time - self._last_youtube_call if elapsed < self._min_call_interval: sleep_time = self._min_call_interval - elapsed print(f"⏳ Rate limiting: waiting {sleep_time:.1f}s before YouTube call...") time_module.sleep(sleep_time) self._last_youtube_call = time_module.time() def _add_to_recently_played(self, video_id: str): """Track a video as recently played""" if video_id and video_id not in self._recently_played: self._recently_played.append(video_id) if len(self._recently_played) > self._max_recent: self._recently_played.pop(0) def _is_recently_played(self, video_id: str) -> bool: """Check if a video was recently played""" return video_id in self._recently_played def check_video_embeddable(self, video_id: str) -> bool: """ Check if a YouTube video is available and embeddable. Args: video_id: YouTube video ID Returns: True if video is embeddable, False otherwise """ # Check cache first if video_id in self._embed_cache: return self._embed_cache[video_id] try: ydl_opts = { 'quiet': True, 'no_warnings': True, 'skip_download': True, } with yt_dlp.YoutubeDL(ydl_opts) as ydl: url = f"https://www.youtube.com/watch?v={video_id}" info = ydl.extract_info(url, download=False) if not info: print(f"⚠️ Video {video_id}: not found") self._embed_cache[video_id] = False return False # Check if video is available availability = info.get('availability') if availability and availability != 'public': print(f"⚠️ Video {video_id}: not public ({availability})") self._embed_cache[video_id] = False return False # Check if video is age-restricted (often blocks embedding) if info.get('age_limit', 0) > 0: print(f"⚠️ Video {video_id}: age restricted") self._embed_cache[video_id] = False return False # Check if video is live (live streams might have issues) if info.get('is_live'): print(f"ℹ️ Video {video_id}: live stream (may have embed issues)") # Allow live streams but note they might have issues # Check playability playable_in_embed = info.get('playable_in_embed', True) if playable_in_embed is False: print(f"⚠️ Video {video_id}: not playable in embed") self._embed_cache[video_id] = False return False print(f"✅ Video {video_id}: embeddable") self._embed_cache[video_id] = True return True except Exception as e: error_msg = str(e).lower() if 'unavailable' in error_msg or 'private' in error_msg or 'removed' in error_msg: print(f"⚠️ Video {video_id}: unavailable - {e}") else: print(f"⚠️ Video {video_id}: check failed - {e}") self._embed_cache[video_id] = False return False def search_youtube_via_modal_proxy(self, query: str, limit: int = 5) -> List[Dict[str, Any]]: """Search YouTube using Modal proxy (bypasses all network restrictions)""" tracks = [] try: # Use Modal proxy endpoint modal_url = "https://nikitaxmakarov--youtube-search.modal.run" # Don't add "music" if query already contains it search_query = query if "music" in query.lower() else f"{query} music" params = { 'query': search_query, 'limit': limit } print(f"🔍 Calling Modal proxy: {modal_url} with query: '{search_query}'") response = requests.get(modal_url, params=params, timeout=30) response.raise_for_status() data = response.json() print(f"📦 Modal proxy response: success={data.get('success')}, tracks_count={len(data.get('tracks', []))}") if data.get('success') and 'tracks' in data: tracks = data['tracks'] for track in tracks: print(f" ✓ Found via Modal proxy: {track.get('title', 'Unknown')} by {track.get('artist', 'Unknown')}") else: error_msg = data.get('error', 'Unknown error') print(f"⚠️ Modal proxy returned error: {error_msg}") except requests.exceptions.RequestException as e: print(f"⚠️ Modal proxy request failed: {e}") except Exception as e: print(f"⚠️ Modal proxy search failed: {e}") import traceback traceback.print_exc() return tracks def search_youtube_via_api(self, query: str, limit: int = 5) -> List[Dict[str, Any]]: """Search YouTube using official Data API v3 (works in restricted networks)""" # This method uses HTTPS API instead of yt-dlp, which should work in HF Spaces tracks = [] # Build search query search_query = f"{query} music" if "music" not in query.lower(): search_query = f"{query} music" try: # YouTube Data API v3 search endpoint # Note: This requires a Google API key, but we'll try without one first # The API has some quota without key, but it's limited url = "https://www.googleapis.com/youtube/v3/search" params = { 'part': 'snippet', 'q': search_query, 'type': 'video', 'maxResults': limit * 2, # Get more to filter 'order': 'relevance', 'safeSearch': 'moderate', 'key': '' # Empty key for now - YouTube allows some requests without key } response = requests.get(url, params=params, timeout=30) response.raise_for_status() data = response.json() if 'items' in data: for item in data['items'][:limit]: if item.get('id', {}).get('videoId'): video_id = item['id']['videoId'] snippet = item.get('snippet', {}) track = { "title": snippet.get('title', 'Unknown'), "artist": snippet.get('channelTitle', 'Unknown Artist'), "url": f"https://www.youtube.com/watch?v={video_id}", "youtube_id": video_id, "duration": 0, # We can't get duration without API key "genre": query.split()[0] if query else "unknown", "source": "youtube_api", "thumbnail": snippet.get('thumbnails', {}).get('default', {}).get('url', '') } tracks.append(track) print(f" ✓ Found via API: {track['title']} by {track['artist']}") except Exception as e: print(f"⚠️ YouTube API search failed: {e}") return tracks def search_youtube_music(self, query: str, limit: int = 5, fast: bool = False, check_embed: bool = False) -> List[Dict[str, Any]]: """ Search for free music on YouTube with retry logic for network issues Args: query: Search query (e.g., "pop music", "jazz instrumental", "song name") limit: Number of results to fetch (will randomly select one) fast: If True, use flat extraction for faster results (less metadata) check_embed: If True, verify videos are embeddable (slower but more reliable) Returns: List of track dictionaries with YouTube URLs """ # Apply rate limiting self._rate_limit_youtube() # First, try Modal proxy (works in all restricted networks) print("🔍 Trying Modal proxy search...") modal_tracks = self.search_youtube_via_modal_proxy(query, limit) if modal_tracks: print(f"✅ Found {len(modal_tracks)} tracks via Modal proxy") return modal_tracks # Fallback to direct YouTube API (works in restricted networks but may not work in HF Spaces) print("⚠️ Modal proxy failed, trying YouTube Data API...") api_tracks = self.search_youtube_via_api(query, limit) if api_tracks: print(f"✅ Found {len(api_tracks)} tracks via YouTube API") return api_tracks # Last resort: try yt-dlp (won't work in HF Spaces due to DNS restrictions) print("⚠️ YouTube API failed, trying yt-dlp...") # Check if YouTube is accessible before attempting search if not self._check_youtube_available(): print("⚠️ YouTube is not accessible (DNS/network issue). Skipping yt-dlp search.") return [] tracks = [] max_retries = 3 retry_delay = 2 # Start with 2 seconds for attempt in range(max_retries): try: # Try to resolve DNS first (helps diagnose network issues) try: socket.gethostbyname('www.youtube.com') except socket.gaierror as dns_error: if attempt < max_retries - 1: print(f"⚠️ DNS resolution failed (attempt {attempt + 1}/{max_retries}), retrying in {retry_delay}s...") time_module.sleep(retry_delay) retry_delay *= 2 # Exponential backoff continue else: print(f"❌ DNS resolution failed after {max_retries} attempts. YouTube may be blocked or network unavailable.") return tracks # Return empty list, will fallback to SoundCloud # Use extract_flat for faster search (no full video info) ydl_opts = { 'quiet': True, 'no_warnings': True, 'extract_flat': True, # Fast: only get basic info 'default_search': 'ytsearch', 'socket_timeout': 30, # Increase timeout for network issues } # Search for more results to allow for filtering and random selection # Increase limit to account for filtering out recently played search_limit = max(limit * 3, 15) with yt_dlp.YoutubeDL(ydl_opts) as ydl: # Don't add "music" if query already contains it or is specific if "music" not in query.lower() and len(query.split()) < 4: search_query = f"ytsearch{search_limit}:{query} music" else: search_query = f"ytsearch{search_limit}:{query}" print(f"🔍 YouTube search query: '{search_query}'") results = ydl.extract_info(search_query, download=False) # Handle different result formats from yt-dlp entries = None if isinstance(results, dict): if 'entries' in results: entries = results['entries'] elif 'id' in results: entries = [results] elif isinstance(results, list): entries = results if entries: # Filter valid entries valid_entries = [] for entry in entries: if entry and isinstance(entry, dict): video_id = entry.get('id') or entry.get('url', '') if video_id and video_id != 'None': valid_entries.append(entry) # Randomly shuffle to avoid always picking top results if len(valid_entries) > 1: random.shuffle(valid_entries) # Filter, check embeddability, avoid recently played, and take requested limit for entry in valid_entries: if len(tracks) >= limit: break video_id = entry.get('id') or entry.get('url', '') if video_id: # Skip recently played tracks if self._is_recently_played(video_id): print(f" ✗ Skipping recently played: {entry.get('title', 'Unknown')}") continue # Check if video is embeddable (optional) if check_embed and not self.check_video_embeddable(video_id): print(f" ✗ Skipping non-embeddable: {entry.get('title', 'Unknown')}") continue track = { "title": entry.get('title', 'Unknown'), "artist": entry.get('uploader', entry.get('channel', 'Unknown Artist')), "url": f"https://www.youtube.com/watch?v={video_id}", "youtube_id": video_id, "duration": entry.get('duration', 0), "genre": query.split()[0] if query else "unknown", "source": "youtube" } tracks.append(track) # Mark as recently played self._add_to_recently_played(video_id) print(f" ✓ Found: {track['title']} by {track['artist']}") # Success! Break out of retry loop break except (yt_dlp.utils.DownloadError, Exception) as e: error_str = str(e) # Check for DNS/network errors if any(keyword in error_str for keyword in ["Failed to resolve", "No address associated", "NameResolutionError", "gaierror"]): if attempt < max_retries - 1: print(f"⚠️ Network/DNS error (attempt {attempt + 1}/{max_retries}): {error_str[:100]}...") print(f" Retrying in {retry_delay}s...") time_module.sleep(retry_delay) retry_delay *= 2 # Exponential backoff continue else: print(f"❌ Network error after {max_retries} attempts. YouTube unavailable.") return tracks else: # Other errors, don't retry print(f"❌ Error searching YouTube: {e}") import traceback traceback.print_exc() break return tracks def search_soundcloud_via_api(self, query: str, limit: int = 5) -> List[Dict[str, Any]]: """Search SoundCloud using unofficial API (works in restricted networks)""" tracks = [] try: # SoundCloud API doesn't require authentication for basic search search_query = f"{query} music" url = f"https://api-v2.soundcloud.com/search/tracks" params = { 'q': search_query, 'limit': limit, 'client_id': 'CLIENT_ID_PLACEHOLDER' # SoundCloud allows some requests without client_id } response = requests.get(url, params=params, timeout=30) if response.status_code == 200: data = response.json() if 'collection' in data: for item in data['collection'][:limit]: if item.get('streamable'): track = { "title": item.get('title', 'Unknown'), "artist": item.get('user', {}).get('username', 'Unknown Artist'), "url": item.get('permalink_url', ''), "duration": item.get('duration', 0) // 1000, # Convert from ms to seconds "genre": query.split()[0] if query else "unknown", "source": "soundcloud_api" } tracks.append(track) print(f" ✓ Found on SoundCloud: {track['title']} by {track['artist']}") except Exception as e: print(f"⚠️ SoundCloud API search failed: {e}") return tracks def search_soundcloud_music(self, query: str, limit: int = 5) -> List[Dict[str, Any]]: """ Search for free music on SoundCloud Args: query: Search query (e.g., "pop music", "jazz instrumental") limit: Number of results Returns: List of track dictionaries with SoundCloud URLs """ tracks = [] try: ydl_opts = { 'quiet': True, 'no_warnings': True, 'extract_flat': True, 'default_search': 'scsearch', 'format': 'bestaudio/best', } with yt_dlp.YoutubeDL(ydl_opts) as ydl: search_query = f"{query} music" results = ydl.extract_info(search_query, download=False) if 'entries' in results: for entry in results['entries'][:limit]: if entry: track = { "title": entry.get('title', 'Unknown'), "artist": entry.get('uploader', 'Unknown Artist'), "url": entry.get('url', entry.get('webpage_url', '')), "duration": entry.get('duration', 0), "genre": query.split()[0] if query else "unknown", "source": "soundcloud" } tracks.append(track) except Exception as e: print(f"Error searching SoundCloud: {e}") return tracks def get_audio_url(self, youtube_url: str) -> Optional[str]: """ Get direct audio URL from YouTube video Args: youtube_url: YouTube video URL Returns: Direct audio URL or None """ try: ydl_opts = { 'format': 'bestaudio/best', 'quiet': True, 'no_warnings': True, } with yt_dlp.YoutubeDL(ydl_opts) as ydl: info = ydl.extract_info(youtube_url, download=False) if 'url' in info: return info['url'] except Exception as e: print(f"Error getting audio URL: {e}") return None def download_audio(self, youtube_url: str, output_path: str) -> Optional[str]: """ Download audio from YouTube to local file Args: youtube_url: YouTube video URL output_path: Path to save audio file (without extension) Returns: Path to downloaded file or None """ try: # Ensure output path doesn't have extension (yt-dlp adds it) if output_path.endswith('.mp3'): output_path = output_path[:-4] ydl_opts = { 'format': 'bestaudio/best', 'outtmpl': output_path + '.%(ext)s', 'postprocessors': [{ 'key': 'FFmpegExtractAudio', 'preferredcodec': 'mp3', 'preferredquality': '192', }], 'quiet': True, 'no_warnings': True, } with yt_dlp.YoutubeDL(ydl_opts) as ydl: ydl.download([youtube_url]) # Check for downloaded file if os.path.exists(output_path + '.mp3'): return output_path + '.mp3' # Sometimes it might be .m4a or other format for ext in ['.mp3', '.m4a', '.webm', '.ogg']: if os.path.exists(output_path + ext): return output_path + ext except Exception as e: print(f"Error downloading audio: {e}") return None def search_free_music(self, genre: str = "pop", mood: str = "happy", limit: int = 5) -> List[Dict[str, Any]]: """ Search for free music tracks based on genre and mood Uses YouTube and SoundCloud for free music Supports both known genres and custom genres specified by users Args: genre: Music genre (pop, rock, jazz, classical, electronic, or any custom genre like "reggae", "metal", "k-pop", etc.) mood: Mood of music (happy, sad, energetic, calm, etc.) limit: Number of tracks to return Returns: List of track dictionaries """ # Build search query - handle both known and custom genres genre_lower = genre.lower().strip() # If genre is a multi-word phrase (custom genre), use it as-is # Otherwise, combine with mood if " " in genre_lower or len(genre_lower.split()) > 1: # Custom genre phrase - use it directly query = f"{genre_lower} {mood}" if mood else genre_lower else: # Single word genre - combine with mood query = f"{genre_lower} {mood}" if mood else genre_lower # Try YouTube first youtube_tracks = self.search_youtube_music(query, limit=limit) if youtube_tracks: # Update genre field to preserve custom genre for track in youtube_tracks: track["genre"] = genre_lower return youtube_tracks # Try SoundCloud as fallback soundcloud_tracks = self.search_soundcloud_music(query, limit=limit) if soundcloud_tracks: # Update genre field to preserve custom genre for track in soundcloud_tracks: track["genre"] = genre_lower return soundcloud_tracks # If both searches fail, try searching just the genre without mood if mood and mood != "happy": simple_query = genre_lower youtube_tracks = self.search_youtube_music(simple_query, limit=limit) if youtube_tracks: for track in youtube_tracks: track["genre"] = genre_lower return youtube_tracks # Fallback to demo tracks only for known genres known_genres = ["pop", "rock", "jazz", "classical", "electronic", "country", "indie", "rap", "blues", "folk", "hip-hop"] demo_tracks = { "pop": [ { "title": "Lofi Hip Hop Radio", "artist": "ChilledCow", "url": "https://www.youtube.com/watch?v=jfKfPfyJRdk", "youtube_id": "jfKfPfyJRdk", "duration": 0, # Live stream "genre": "pop", "source": "youtube" }, { "title": "Synthwave Radio", "artist": "Free Music", "url": "https://www.youtube.com/watch?v=4xDzrJKXOOY", "youtube_id": "4xDzrJKXOOY", "duration": 0, "genre": "pop", "source": "youtube" }, ], "rock": [ { "title": "Rock Music Stream", "artist": "Free Music", "url": "https://www.youtube.com/watch?v=jfKfPfyJRdk", "youtube_id": "jfKfPfyJRdk", "duration": 0, "genre": "rock", "source": "youtube" }, ], "jazz": [ { "title": "Jazz Music", "artist": "Free Music", "url": "https://www.youtube.com/watch?v=jfKfPfyJRdk", "youtube_id": "jfKfPfyJRdk", "duration": 0, "genre": "jazz", "source": "youtube" }, ], "classical": [ { "title": "Classical Music", "artist": "Free Music", "url": "https://www.youtube.com/watch?v=jfKfPfyJRdk", "youtube_id": "jfKfPfyJRdk", "duration": 0, "genre": "classical", "source": "youtube" }, ], "electronic": [ { "title": "Electronic Beats", "artist": "Free Music", "url": "https://www.youtube.com/watch?v=jfKfPfyJRdk", "youtube_id": "jfKfPfyJRdk", "duration": 0, "genre": "electronic", "source": "youtube" }, ] } # Only use demo tracks for known genres if genre_lower in known_genres: tracks = demo_tracks.get(genre_lower, demo_tracks["pop"]) return tracks[:limit] else: # For custom genres, return empty list if search failed # This allows the caller to handle the failure appropriately print(f"⚠️ No tracks found for custom genre: {genre_lower}") return [] def search_by_request(self, song_request: Dict[str, Any]) -> List[Dict[str, Any]]: """ Search for music based on voice request Supports both known genres and custom genres specified by users Args: song_request: Dictionary with song request details (may include custom genre) Returns: List of matching tracks """ # Build search query from request # Prefer original text, but clean it up if song_request.get("original_text"): original = song_request["original_text"] # Remove common filler words but keep the core query filler_words = ["play", "put on", "listen to", "want to hear", "i want to", "can you", "please"] query = original.lower() for filler in filler_words: query = query.replace(filler, "").strip() query = " ".join(query.split()) # Clean up spaces # If query is too short or unclear, use original if len(query.split()) < 2: query = original else: # Build query from parts query_parts = [] # If we have a custom genre (multi-word or not in known list), prioritize it custom_genre = song_request.get("genre") known_genres = ["pop", "rock", "jazz", "classical", "electronic", "country", "indie", "rap", "blues", "folk", "hip-hop", "hip hop"] is_custom_genre = custom_genre and (custom_genre not in known_genres or " " in custom_genre) if song_request.get("song") and len(song_request["song"].split()) > 1: query_parts.append(song_request["song"]) elif song_request.get("song"): # Single word song - check if it's actually a genre if is_custom_genre: query_parts.append(custom_genre) else: query_parts.append(song_request["song"]) if song_request.get("artist"): query_parts.append(song_request["artist"]) # Add genre if it's custom or not already in query if custom_genre and custom_genre not in " ".join(query_parts): query_parts.append(custom_genre) query = " ".join(query_parts) if query_parts else "music" print(f"🔍 Searching for music: '{query}'") # Try Modal proxy first (works in restricted networks like HF Spaces) tracks = self.search_youtube_via_modal_proxy(query, limit=1) if not tracks: # Fallback to direct YouTube search (won't work in HF Spaces but kept for local testing) tracks = self.search_youtube_music(query, limit=1, fast=True) if tracks: # Preserve custom genre in track metadata if song_request.get("genre"): for track in tracks: track["genre"] = song_request["genre"] print(f"✅ Found track on YouTube: {tracks[0].get('title', 'Unknown')}") return tracks # If YouTube fails, try SoundCloud print("⚠️ YouTube search failed, trying SoundCloud...") tracks = self.search_soundcloud_music(query, limit=1) if tracks: # Preserve custom genre in track metadata if song_request.get("genre"): for track in tracks: track["genre"] = song_request["genre"] print(f"✅ Found track on SoundCloud") return tracks # If both fail, try a simpler query print("⚠️ Both searches failed, trying simplified query...") if song_request.get("song"): simple_query = song_request["song"] # Try Modal proxy first tracks = self.search_youtube_via_modal_proxy(simple_query, limit=1) if not tracks: # Fallback to direct YouTube search tracks = self.search_youtube_music(simple_query, limit=1, fast=True) if tracks: if song_request.get("genre"): for track in tracks: track["genre"] = song_request["genre"] return tracks # Last resort: search by genre (works for both known and custom genres) if song_request.get("genre"): mood = song_request.get("mood", "happy") tracks = self.search_free_music(genre=song_request["genre"], mood=mood, limit=1) if tracks: return tracks print("❌ No tracks found") return [] def get_personalized_playlist(self, user_preferences: Dict[str, Any]) -> List[Dict[str, Any]]: """ Generate personalized playlist based on user preferences Args: user_preferences: Dictionary with user's music preferences Returns: List of recommended tracks """ favorite_genres = user_preferences.get("favorite_genres", ["pop"]) mood = user_preferences.get("mood", "happy") playlist = [] for genre in favorite_genres[:3]: # Mix top 3 genres tracks = self.search_free_music(genre=genre, mood=mood, limit=2) playlist.extend(tracks) return playlist def get_tools_definition(self) -> List[Dict[str, Any]]: """Return MCP tools definition for this server""" return [ { "name": "search_music", "description": "Search for free music tracks by genre and mood", "parameters": { "type": "object", "properties": { "genre": { "type": "string", "description": "Music genre (pop, rock, jazz, classical, electronic, or any custom genre like reggae, metal, k-pop, etc.)" }, "mood": { "type": "string", "description": "Mood of the music (happy, sad, energetic, calm)" }, "limit": { "type": "integer", "description": "Number of tracks to return" } }, "required": ["genre"] } }, { "name": "get_personalized_playlist", "description": "Get a personalized playlist based on user preferences", "parameters": { "type": "object", "properties": { "user_preferences": { "type": "object", "description": "User's music preferences including favorite genres and current mood" } }, "required": ["user_preferences"] } } ]