Spaces:
Sleeping
Sleeping
| """MCP Server for Music Recommendations""" | |
| import json | |
| import requests | |
| import yt_dlp | |
| import os | |
| import random | |
| import socket | |
| import time as time_module | |
| from typing import List, Dict, Any, Optional | |
| from dataclasses import dataclass | |
| class Track: | |
| """Music track information""" | |
| title: str | |
| artist: str | |
| url: str | |
| duration: int | |
| genre: str | |
| class MusicMCPServer: | |
| """MCP Server for music recommendations and playback""" | |
| def __init__(self): | |
| self.name = "music_server" | |
| self.description = "Provides music recommendations and free music tracks" | |
| self.cache_dir = "music_cache" | |
| os.makedirs(self.cache_dir, exist_ok=True) | |
| # Cache for embed check results to avoid repeated API calls | |
| self._embed_cache = {} | |
| # Rate limiting for YouTube API (prevent blocking) | |
| self._last_youtube_call = 0 | |
| self._min_call_interval = 3.0 # Minimum 3 seconds between YouTube calls | |
| # Track recently played to avoid repeats | |
| self._recently_played = [] # List of video IDs | |
| self._max_recent = 20 # Remember last 20 tracks | |
| def _check_youtube_available(self) -> bool: | |
| """Check if YouTube is accessible via DNS""" | |
| try: | |
| socket.gethostbyname('www.youtube.com') | |
| return True | |
| except socket.gaierror: | |
| return False | |
| def _rate_limit_youtube(self): | |
| """Enforce rate limiting for YouTube API calls""" | |
| import time as time_module | |
| current_time = time_module.time() | |
| elapsed = current_time - self._last_youtube_call | |
| if elapsed < self._min_call_interval: | |
| sleep_time = self._min_call_interval - elapsed | |
| print(f"⏳ Rate limiting: waiting {sleep_time:.1f}s before YouTube call...") | |
| time_module.sleep(sleep_time) | |
| self._last_youtube_call = time_module.time() | |
| def _add_to_recently_played(self, video_id: str): | |
| """Track a video as recently played""" | |
| if video_id and video_id not in self._recently_played: | |
| self._recently_played.append(video_id) | |
| if len(self._recently_played) > self._max_recent: | |
| self._recently_played.pop(0) | |
| def _is_recently_played(self, video_id: str) -> bool: | |
| """Check if a video was recently played""" | |
| return video_id in self._recently_played | |
| def check_video_embeddable(self, video_id: str) -> bool: | |
| """ | |
| Check if a YouTube video is available and embeddable. | |
| Args: | |
| video_id: YouTube video ID | |
| Returns: | |
| True if video is embeddable, False otherwise | |
| """ | |
| # Check cache first | |
| if video_id in self._embed_cache: | |
| return self._embed_cache[video_id] | |
| try: | |
| ydl_opts = { | |
| 'quiet': True, | |
| 'no_warnings': True, | |
| 'skip_download': True, | |
| } | |
| with yt_dlp.YoutubeDL(ydl_opts) as ydl: | |
| url = f"https://www.youtube.com/watch?v={video_id}" | |
| info = ydl.extract_info(url, download=False) | |
| if not info: | |
| print(f"⚠️ Video {video_id}: not found") | |
| self._embed_cache[video_id] = False | |
| return False | |
| # Check if video is available | |
| availability = info.get('availability') | |
| if availability and availability != 'public': | |
| print(f"⚠️ Video {video_id}: not public ({availability})") | |
| self._embed_cache[video_id] = False | |
| return False | |
| # Check if video is age-restricted (often blocks embedding) | |
| if info.get('age_limit', 0) > 0: | |
| print(f"⚠️ Video {video_id}: age restricted") | |
| self._embed_cache[video_id] = False | |
| return False | |
| # Check if video is live (live streams might have issues) | |
| if info.get('is_live'): | |
| print(f"ℹ️ Video {video_id}: live stream (may have embed issues)") | |
| # Allow live streams but note they might have issues | |
| # Check playability | |
| playable_in_embed = info.get('playable_in_embed', True) | |
| if playable_in_embed is False: | |
| print(f"⚠️ Video {video_id}: not playable in embed") | |
| self._embed_cache[video_id] = False | |
| return False | |
| print(f"✅ Video {video_id}: embeddable") | |
| self._embed_cache[video_id] = True | |
| return True | |
| except Exception as e: | |
| error_msg = str(e).lower() | |
| if 'unavailable' in error_msg or 'private' in error_msg or 'removed' in error_msg: | |
| print(f"⚠️ Video {video_id}: unavailable - {e}") | |
| else: | |
| print(f"⚠️ Video {video_id}: check failed - {e}") | |
| self._embed_cache[video_id] = False | |
| return False | |
| def search_youtube_via_modal_proxy(self, query: str, limit: int = 5) -> List[Dict[str, Any]]: | |
| """Search YouTube using Modal proxy (bypasses all network restrictions)""" | |
| tracks = [] | |
| try: | |
| # Use Modal proxy endpoint | |
| modal_url = "https://nikitaxmakarov--youtube-search.modal.run" | |
| # Don't add "music" if query already contains it | |
| search_query = query if "music" in query.lower() else f"{query} music" | |
| params = { | |
| 'query': search_query, | |
| 'limit': limit | |
| } | |
| print(f"🔍 Calling Modal proxy: {modal_url} with query: '{search_query}'") | |
| response = requests.get(modal_url, params=params, timeout=30) | |
| response.raise_for_status() | |
| data = response.json() | |
| print(f"📦 Modal proxy response: success={data.get('success')}, tracks_count={len(data.get('tracks', []))}") | |
| if data.get('success') and 'tracks' in data: | |
| tracks = data['tracks'] | |
| for track in tracks: | |
| print(f" ✓ Found via Modal proxy: {track.get('title', 'Unknown')} by {track.get('artist', 'Unknown')}") | |
| else: | |
| error_msg = data.get('error', 'Unknown error') | |
| print(f"⚠️ Modal proxy returned error: {error_msg}") | |
| except requests.exceptions.RequestException as e: | |
| print(f"⚠️ Modal proxy request failed: {e}") | |
| except Exception as e: | |
| print(f"⚠️ Modal proxy search failed: {e}") | |
| import traceback | |
| traceback.print_exc() | |
| return tracks | |
| def search_youtube_via_api(self, query: str, limit: int = 5) -> List[Dict[str, Any]]: | |
| """Search YouTube using official Data API v3 (works in restricted networks)""" | |
| # This method uses HTTPS API instead of yt-dlp, which should work in HF Spaces | |
| tracks = [] | |
| # Build search query | |
| search_query = f"{query} music" | |
| if "music" not in query.lower(): | |
| search_query = f"{query} music" | |
| try: | |
| # YouTube Data API v3 search endpoint | |
| # Note: This requires a Google API key, but we'll try without one first | |
| # The API has some quota without key, but it's limited | |
| url = "https://www.googleapis.com/youtube/v3/search" | |
| params = { | |
| 'part': 'snippet', | |
| 'q': search_query, | |
| 'type': 'video', | |
| 'maxResults': limit * 2, # Get more to filter | |
| 'order': 'relevance', | |
| 'safeSearch': 'moderate', | |
| 'key': '' # Empty key for now - YouTube allows some requests without key | |
| } | |
| response = requests.get(url, params=params, timeout=30) | |
| response.raise_for_status() | |
| data = response.json() | |
| if 'items' in data: | |
| for item in data['items'][:limit]: | |
| if item.get('id', {}).get('videoId'): | |
| video_id = item['id']['videoId'] | |
| snippet = item.get('snippet', {}) | |
| track = { | |
| "title": snippet.get('title', 'Unknown'), | |
| "artist": snippet.get('channelTitle', 'Unknown Artist'), | |
| "url": f"https://www.youtube.com/watch?v={video_id}", | |
| "youtube_id": video_id, | |
| "duration": 0, # We can't get duration without API key | |
| "genre": query.split()[0] if query else "unknown", | |
| "source": "youtube_api", | |
| "thumbnail": snippet.get('thumbnails', {}).get('default', {}).get('url', '') | |
| } | |
| tracks.append(track) | |
| print(f" ✓ Found via API: {track['title']} by {track['artist']}") | |
| except Exception as e: | |
| print(f"⚠️ YouTube API search failed: {e}") | |
| return tracks | |
| def search_youtube_music(self, query: str, limit: int = 5, fast: bool = False, check_embed: bool = False) -> List[Dict[str, Any]]: | |
| """ | |
| Search for free music on YouTube with retry logic for network issues | |
| Args: | |
| query: Search query (e.g., "pop music", "jazz instrumental", "song name") | |
| limit: Number of results to fetch (will randomly select one) | |
| fast: If True, use flat extraction for faster results (less metadata) | |
| check_embed: If True, verify videos are embeddable (slower but more reliable) | |
| Returns: | |
| List of track dictionaries with YouTube URLs | |
| """ | |
| # Apply rate limiting | |
| self._rate_limit_youtube() | |
| # First, try Modal proxy (works in all restricted networks) | |
| print("🔍 Trying Modal proxy search...") | |
| modal_tracks = self.search_youtube_via_modal_proxy(query, limit) | |
| if modal_tracks: | |
| print(f"✅ Found {len(modal_tracks)} tracks via Modal proxy") | |
| return modal_tracks | |
| # Fallback to direct YouTube API (works in restricted networks but may not work in HF Spaces) | |
| print("⚠️ Modal proxy failed, trying YouTube Data API...") | |
| api_tracks = self.search_youtube_via_api(query, limit) | |
| if api_tracks: | |
| print(f"✅ Found {len(api_tracks)} tracks via YouTube API") | |
| return api_tracks | |
| # Last resort: try yt-dlp (won't work in HF Spaces due to DNS restrictions) | |
| print("⚠️ YouTube API failed, trying yt-dlp...") | |
| # Check if YouTube is accessible before attempting search | |
| if not self._check_youtube_available(): | |
| print("⚠️ YouTube is not accessible (DNS/network issue). Skipping yt-dlp search.") | |
| return [] | |
| tracks = [] | |
| max_retries = 3 | |
| retry_delay = 2 # Start with 2 seconds | |
| for attempt in range(max_retries): | |
| try: | |
| # Try to resolve DNS first (helps diagnose network issues) | |
| try: | |
| socket.gethostbyname('www.youtube.com') | |
| except socket.gaierror as dns_error: | |
| if attempt < max_retries - 1: | |
| print(f"⚠️ DNS resolution failed (attempt {attempt + 1}/{max_retries}), retrying in {retry_delay}s...") | |
| time_module.sleep(retry_delay) | |
| retry_delay *= 2 # Exponential backoff | |
| continue | |
| else: | |
| print(f"❌ DNS resolution failed after {max_retries} attempts. YouTube may be blocked or network unavailable.") | |
| return tracks # Return empty list, will fallback to SoundCloud | |
| # Use extract_flat for faster search (no full video info) | |
| ydl_opts = { | |
| 'quiet': True, | |
| 'no_warnings': True, | |
| 'extract_flat': True, # Fast: only get basic info | |
| 'default_search': 'ytsearch', | |
| 'socket_timeout': 30, # Increase timeout for network issues | |
| } | |
| # Search for more results to allow for filtering and random selection | |
| # Increase limit to account for filtering out recently played | |
| search_limit = max(limit * 3, 15) | |
| with yt_dlp.YoutubeDL(ydl_opts) as ydl: | |
| # Don't add "music" if query already contains it or is specific | |
| if "music" not in query.lower() and len(query.split()) < 4: | |
| search_query = f"ytsearch{search_limit}:{query} music" | |
| else: | |
| search_query = f"ytsearch{search_limit}:{query}" | |
| print(f"🔍 YouTube search query: '{search_query}'") | |
| results = ydl.extract_info(search_query, download=False) | |
| # Handle different result formats from yt-dlp | |
| entries = None | |
| if isinstance(results, dict): | |
| if 'entries' in results: | |
| entries = results['entries'] | |
| elif 'id' in results: | |
| entries = [results] | |
| elif isinstance(results, list): | |
| entries = results | |
| if entries: | |
| # Filter valid entries | |
| valid_entries = [] | |
| for entry in entries: | |
| if entry and isinstance(entry, dict): | |
| video_id = entry.get('id') or entry.get('url', '') | |
| if video_id and video_id != 'None': | |
| valid_entries.append(entry) | |
| # Randomly shuffle to avoid always picking top results | |
| if len(valid_entries) > 1: | |
| random.shuffle(valid_entries) | |
| # Filter, check embeddability, avoid recently played, and take requested limit | |
| for entry in valid_entries: | |
| if len(tracks) >= limit: | |
| break | |
| video_id = entry.get('id') or entry.get('url', '') | |
| if video_id: | |
| # Skip recently played tracks | |
| if self._is_recently_played(video_id): | |
| print(f" ✗ Skipping recently played: {entry.get('title', 'Unknown')}") | |
| continue | |
| # Check if video is embeddable (optional) | |
| if check_embed and not self.check_video_embeddable(video_id): | |
| print(f" ✗ Skipping non-embeddable: {entry.get('title', 'Unknown')}") | |
| continue | |
| track = { | |
| "title": entry.get('title', 'Unknown'), | |
| "artist": entry.get('uploader', entry.get('channel', 'Unknown Artist')), | |
| "url": f"https://www.youtube.com/watch?v={video_id}", | |
| "youtube_id": video_id, | |
| "duration": entry.get('duration', 0), | |
| "genre": query.split()[0] if query else "unknown", | |
| "source": "youtube" | |
| } | |
| tracks.append(track) | |
| # Mark as recently played | |
| self._add_to_recently_played(video_id) | |
| print(f" ✓ Found: {track['title']} by {track['artist']}") | |
| # Success! Break out of retry loop | |
| break | |
| except (yt_dlp.utils.DownloadError, Exception) as e: | |
| error_str = str(e) | |
| # Check for DNS/network errors | |
| if any(keyword in error_str for keyword in ["Failed to resolve", "No address associated", "NameResolutionError", "gaierror"]): | |
| if attempt < max_retries - 1: | |
| print(f"⚠️ Network/DNS error (attempt {attempt + 1}/{max_retries}): {error_str[:100]}...") | |
| print(f" Retrying in {retry_delay}s...") | |
| time_module.sleep(retry_delay) | |
| retry_delay *= 2 # Exponential backoff | |
| continue | |
| else: | |
| print(f"❌ Network error after {max_retries} attempts. YouTube unavailable.") | |
| return tracks | |
| else: | |
| # Other errors, don't retry | |
| print(f"❌ Error searching YouTube: {e}") | |
| import traceback | |
| traceback.print_exc() | |
| break | |
| return tracks | |
| def search_soundcloud_via_api(self, query: str, limit: int = 5) -> List[Dict[str, Any]]: | |
| """Search SoundCloud using unofficial API (works in restricted networks)""" | |
| tracks = [] | |
| try: | |
| # SoundCloud API doesn't require authentication for basic search | |
| search_query = f"{query} music" | |
| url = f"https://api-v2.soundcloud.com/search/tracks" | |
| params = { | |
| 'q': search_query, | |
| 'limit': limit, | |
| 'client_id': 'CLIENT_ID_PLACEHOLDER' # SoundCloud allows some requests without client_id | |
| } | |
| response = requests.get(url, params=params, timeout=30) | |
| if response.status_code == 200: | |
| data = response.json() | |
| if 'collection' in data: | |
| for item in data['collection'][:limit]: | |
| if item.get('streamable'): | |
| track = { | |
| "title": item.get('title', 'Unknown'), | |
| "artist": item.get('user', {}).get('username', 'Unknown Artist'), | |
| "url": item.get('permalink_url', ''), | |
| "duration": item.get('duration', 0) // 1000, # Convert from ms to seconds | |
| "genre": query.split()[0] if query else "unknown", | |
| "source": "soundcloud_api" | |
| } | |
| tracks.append(track) | |
| print(f" ✓ Found on SoundCloud: {track['title']} by {track['artist']}") | |
| except Exception as e: | |
| print(f"⚠️ SoundCloud API search failed: {e}") | |
| return tracks | |
| def search_soundcloud_music(self, query: str, limit: int = 5) -> List[Dict[str, Any]]: | |
| """ | |
| Search for free music on SoundCloud | |
| Args: | |
| query: Search query (e.g., "pop music", "jazz instrumental") | |
| limit: Number of results | |
| Returns: | |
| List of track dictionaries with SoundCloud URLs | |
| """ | |
| tracks = [] | |
| try: | |
| ydl_opts = { | |
| 'quiet': True, | |
| 'no_warnings': True, | |
| 'extract_flat': True, | |
| 'default_search': 'scsearch', | |
| 'format': 'bestaudio/best', | |
| } | |
| with yt_dlp.YoutubeDL(ydl_opts) as ydl: | |
| search_query = f"{query} music" | |
| results = ydl.extract_info(search_query, download=False) | |
| if 'entries' in results: | |
| for entry in results['entries'][:limit]: | |
| if entry: | |
| track = { | |
| "title": entry.get('title', 'Unknown'), | |
| "artist": entry.get('uploader', 'Unknown Artist'), | |
| "url": entry.get('url', entry.get('webpage_url', '')), | |
| "duration": entry.get('duration', 0), | |
| "genre": query.split()[0] if query else "unknown", | |
| "source": "soundcloud" | |
| } | |
| tracks.append(track) | |
| except Exception as e: | |
| print(f"Error searching SoundCloud: {e}") | |
| return tracks | |
| def get_audio_url(self, youtube_url: str) -> Optional[str]: | |
| """ | |
| Get direct audio URL from YouTube video | |
| Args: | |
| youtube_url: YouTube video URL | |
| Returns: | |
| Direct audio URL or None | |
| """ | |
| try: | |
| ydl_opts = { | |
| 'format': 'bestaudio/best', | |
| 'quiet': True, | |
| 'no_warnings': True, | |
| } | |
| with yt_dlp.YoutubeDL(ydl_opts) as ydl: | |
| info = ydl.extract_info(youtube_url, download=False) | |
| if 'url' in info: | |
| return info['url'] | |
| except Exception as e: | |
| print(f"Error getting audio URL: {e}") | |
| return None | |
| def download_audio(self, youtube_url: str, output_path: str) -> Optional[str]: | |
| """ | |
| Download audio from YouTube to local file | |
| Args: | |
| youtube_url: YouTube video URL | |
| output_path: Path to save audio file (without extension) | |
| Returns: | |
| Path to downloaded file or None | |
| """ | |
| try: | |
| # Ensure output path doesn't have extension (yt-dlp adds it) | |
| if output_path.endswith('.mp3'): | |
| output_path = output_path[:-4] | |
| ydl_opts = { | |
| 'format': 'bestaudio/best', | |
| 'outtmpl': output_path + '.%(ext)s', | |
| 'postprocessors': [{ | |
| 'key': 'FFmpegExtractAudio', | |
| 'preferredcodec': 'mp3', | |
| 'preferredquality': '192', | |
| }], | |
| 'quiet': True, | |
| 'no_warnings': True, | |
| } | |
| with yt_dlp.YoutubeDL(ydl_opts) as ydl: | |
| ydl.download([youtube_url]) | |
| # Check for downloaded file | |
| if os.path.exists(output_path + '.mp3'): | |
| return output_path + '.mp3' | |
| # Sometimes it might be .m4a or other format | |
| for ext in ['.mp3', '.m4a', '.webm', '.ogg']: | |
| if os.path.exists(output_path + ext): | |
| return output_path + ext | |
| except Exception as e: | |
| print(f"Error downloading audio: {e}") | |
| return None | |
| def search_free_music(self, genre: str = "pop", mood: str = "happy", limit: int = 5) -> List[Dict[str, Any]]: | |
| """ | |
| Search for free music tracks based on genre and mood | |
| Uses YouTube and SoundCloud for free music | |
| Supports both known genres and custom genres specified by users | |
| Args: | |
| genre: Music genre (pop, rock, jazz, classical, electronic, or any custom genre like "reggae", "metal", "k-pop", etc.) | |
| mood: Mood of music (happy, sad, energetic, calm, etc.) | |
| limit: Number of tracks to return | |
| Returns: | |
| List of track dictionaries | |
| """ | |
| # Build search query - handle both known and custom genres | |
| genre_lower = genre.lower().strip() | |
| # If genre is a multi-word phrase (custom genre), use it as-is | |
| # Otherwise, combine with mood | |
| if " " in genre_lower or len(genre_lower.split()) > 1: | |
| # Custom genre phrase - use it directly | |
| query = f"{genre_lower} {mood}" if mood else genre_lower | |
| else: | |
| # Single word genre - combine with mood | |
| query = f"{genre_lower} {mood}" if mood else genre_lower | |
| # Try YouTube first | |
| youtube_tracks = self.search_youtube_music(query, limit=limit) | |
| if youtube_tracks: | |
| # Update genre field to preserve custom genre | |
| for track in youtube_tracks: | |
| track["genre"] = genre_lower | |
| return youtube_tracks | |
| # Try SoundCloud as fallback | |
| soundcloud_tracks = self.search_soundcloud_music(query, limit=limit) | |
| if soundcloud_tracks: | |
| # Update genre field to preserve custom genre | |
| for track in soundcloud_tracks: | |
| track["genre"] = genre_lower | |
| return soundcloud_tracks | |
| # If both searches fail, try searching just the genre without mood | |
| if mood and mood != "happy": | |
| simple_query = genre_lower | |
| youtube_tracks = self.search_youtube_music(simple_query, limit=limit) | |
| if youtube_tracks: | |
| for track in youtube_tracks: | |
| track["genre"] = genre_lower | |
| return youtube_tracks | |
| # Fallback to demo tracks only for known genres | |
| known_genres = ["pop", "rock", "jazz", "classical", "electronic", "country", "indie", "rap", "blues", "folk", "hip-hop"] | |
| demo_tracks = { | |
| "pop": [ | |
| { | |
| "title": "Lofi Hip Hop Radio", | |
| "artist": "ChilledCow", | |
| "url": "https://www.youtube.com/watch?v=jfKfPfyJRdk", | |
| "youtube_id": "jfKfPfyJRdk", | |
| "duration": 0, # Live stream | |
| "genre": "pop", | |
| "source": "youtube" | |
| }, | |
| { | |
| "title": "Synthwave Radio", | |
| "artist": "Free Music", | |
| "url": "https://www.youtube.com/watch?v=4xDzrJKXOOY", | |
| "youtube_id": "4xDzrJKXOOY", | |
| "duration": 0, | |
| "genre": "pop", | |
| "source": "youtube" | |
| }, | |
| ], | |
| "rock": [ | |
| { | |
| "title": "Rock Music Stream", | |
| "artist": "Free Music", | |
| "url": "https://www.youtube.com/watch?v=jfKfPfyJRdk", | |
| "youtube_id": "jfKfPfyJRdk", | |
| "duration": 0, | |
| "genre": "rock", | |
| "source": "youtube" | |
| }, | |
| ], | |
| "jazz": [ | |
| { | |
| "title": "Jazz Music", | |
| "artist": "Free Music", | |
| "url": "https://www.youtube.com/watch?v=jfKfPfyJRdk", | |
| "youtube_id": "jfKfPfyJRdk", | |
| "duration": 0, | |
| "genre": "jazz", | |
| "source": "youtube" | |
| }, | |
| ], | |
| "classical": [ | |
| { | |
| "title": "Classical Music", | |
| "artist": "Free Music", | |
| "url": "https://www.youtube.com/watch?v=jfKfPfyJRdk", | |
| "youtube_id": "jfKfPfyJRdk", | |
| "duration": 0, | |
| "genre": "classical", | |
| "source": "youtube" | |
| }, | |
| ], | |
| "electronic": [ | |
| { | |
| "title": "Electronic Beats", | |
| "artist": "Free Music", | |
| "url": "https://www.youtube.com/watch?v=jfKfPfyJRdk", | |
| "youtube_id": "jfKfPfyJRdk", | |
| "duration": 0, | |
| "genre": "electronic", | |
| "source": "youtube" | |
| }, | |
| ] | |
| } | |
| # Only use demo tracks for known genres | |
| if genre_lower in known_genres: | |
| tracks = demo_tracks.get(genre_lower, demo_tracks["pop"]) | |
| return tracks[:limit] | |
| else: | |
| # For custom genres, return empty list if search failed | |
| # This allows the caller to handle the failure appropriately | |
| print(f"⚠️ No tracks found for custom genre: {genre_lower}") | |
| return [] | |
| def search_by_request(self, song_request: Dict[str, Any]) -> List[Dict[str, Any]]: | |
| """ | |
| Search for music based on voice request | |
| Supports both known genres and custom genres specified by users | |
| Args: | |
| song_request: Dictionary with song request details (may include custom genre) | |
| Returns: | |
| List of matching tracks | |
| """ | |
| # Build search query from request | |
| # Prefer original text, but clean it up | |
| if song_request.get("original_text"): | |
| original = song_request["original_text"] | |
| # Remove common filler words but keep the core query | |
| filler_words = ["play", "put on", "listen to", "want to hear", "i want to", "can you", "please"] | |
| query = original.lower() | |
| for filler in filler_words: | |
| query = query.replace(filler, "").strip() | |
| query = " ".join(query.split()) # Clean up spaces | |
| # If query is too short or unclear, use original | |
| if len(query.split()) < 2: | |
| query = original | |
| else: | |
| # Build query from parts | |
| query_parts = [] | |
| # If we have a custom genre (multi-word or not in known list), prioritize it | |
| custom_genre = song_request.get("genre") | |
| known_genres = ["pop", "rock", "jazz", "classical", "electronic", "country", "indie", "rap", "blues", "folk", "hip-hop", "hip hop"] | |
| is_custom_genre = custom_genre and (custom_genre not in known_genres or " " in custom_genre) | |
| if song_request.get("song") and len(song_request["song"].split()) > 1: | |
| query_parts.append(song_request["song"]) | |
| elif song_request.get("song"): | |
| # Single word song - check if it's actually a genre | |
| if is_custom_genre: | |
| query_parts.append(custom_genre) | |
| else: | |
| query_parts.append(song_request["song"]) | |
| if song_request.get("artist"): | |
| query_parts.append(song_request["artist"]) | |
| # Add genre if it's custom or not already in query | |
| if custom_genre and custom_genre not in " ".join(query_parts): | |
| query_parts.append(custom_genre) | |
| query = " ".join(query_parts) if query_parts else "music" | |
| print(f"🔍 Searching for music: '{query}'") | |
| # Try Modal proxy first (works in restricted networks like HF Spaces) | |
| tracks = self.search_youtube_via_modal_proxy(query, limit=1) | |
| if not tracks: | |
| # Fallback to direct YouTube search (won't work in HF Spaces but kept for local testing) | |
| tracks = self.search_youtube_music(query, limit=1, fast=True) | |
| if tracks: | |
| # Preserve custom genre in track metadata | |
| if song_request.get("genre"): | |
| for track in tracks: | |
| track["genre"] = song_request["genre"] | |
| print(f"✅ Found track on YouTube: {tracks[0].get('title', 'Unknown')}") | |
| return tracks | |
| # If YouTube fails, try SoundCloud | |
| print("⚠️ YouTube search failed, trying SoundCloud...") | |
| tracks = self.search_soundcloud_music(query, limit=1) | |
| if tracks: | |
| # Preserve custom genre in track metadata | |
| if song_request.get("genre"): | |
| for track in tracks: | |
| track["genre"] = song_request["genre"] | |
| print(f"✅ Found track on SoundCloud") | |
| return tracks | |
| # If both fail, try a simpler query | |
| print("⚠️ Both searches failed, trying simplified query...") | |
| if song_request.get("song"): | |
| simple_query = song_request["song"] | |
| # Try Modal proxy first | |
| tracks = self.search_youtube_via_modal_proxy(simple_query, limit=1) | |
| if not tracks: | |
| # Fallback to direct YouTube search | |
| tracks = self.search_youtube_music(simple_query, limit=1, fast=True) | |
| if tracks: | |
| if song_request.get("genre"): | |
| for track in tracks: | |
| track["genre"] = song_request["genre"] | |
| return tracks | |
| # Last resort: search by genre (works for both known and custom genres) | |
| if song_request.get("genre"): | |
| mood = song_request.get("mood", "happy") | |
| tracks = self.search_free_music(genre=song_request["genre"], mood=mood, limit=1) | |
| if tracks: | |
| return tracks | |
| print("❌ No tracks found") | |
| return [] | |
| def get_personalized_playlist(self, user_preferences: Dict[str, Any]) -> List[Dict[str, Any]]: | |
| """ | |
| Generate personalized playlist based on user preferences | |
| Args: | |
| user_preferences: Dictionary with user's music preferences | |
| Returns: | |
| List of recommended tracks | |
| """ | |
| favorite_genres = user_preferences.get("favorite_genres", ["pop"]) | |
| mood = user_preferences.get("mood", "happy") | |
| playlist = [] | |
| for genre in favorite_genres[:3]: # Mix top 3 genres | |
| tracks = self.search_free_music(genre=genre, mood=mood, limit=2) | |
| playlist.extend(tracks) | |
| return playlist | |
| def get_tools_definition(self) -> List[Dict[str, Any]]: | |
| """Return MCP tools definition for this server""" | |
| return [ | |
| { | |
| "name": "search_music", | |
| "description": "Search for free music tracks by genre and mood", | |
| "parameters": { | |
| "type": "object", | |
| "properties": { | |
| "genre": { | |
| "type": "string", | |
| "description": "Music genre (pop, rock, jazz, classical, electronic, or any custom genre like reggae, metal, k-pop, etc.)" | |
| }, | |
| "mood": { | |
| "type": "string", | |
| "description": "Mood of the music (happy, sad, energetic, calm)" | |
| }, | |
| "limit": { | |
| "type": "integer", | |
| "description": "Number of tracks to return" | |
| } | |
| }, | |
| "required": ["genre"] | |
| } | |
| }, | |
| { | |
| "name": "get_personalized_playlist", | |
| "description": "Get a personalized playlist based on user preferences", | |
| "parameters": { | |
| "type": "object", | |
| "properties": { | |
| "user_preferences": { | |
| "type": "object", | |
| "description": "User's music preferences including favorite genres and current mood" | |
| } | |
| }, | |
| "required": ["user_preferences"] | |
| } | |
| } | |
| ] | |