Spaces:
Sleeping
Sleeping
| import os | |
| import requests | |
| import gradio as gr | |
| import urllib.parse | |
| import base64 | |
| import secrets | |
| import hashlib | |
| # Spotify API credentials | |
| CLIENT_ID = os.getenv("SPOTIFY_CLIENT_ID") | |
| CLIENT_SECRET = os.getenv("SPOTIFY_CLIENT_SECRET") | |
| REDIRECT_URI = os.getenv("REDIRECT_URI", "https://lenpanda-spotify-mcp.hf.space") | |
| SPOTIFY_AUTH_URL = "https://accounts.spotify.com/authorize" | |
| SPOTIFY_TOKEN_URL = "https://accounts.spotify.com/api/token" | |
| SCOPE = "playlist-modify-public playlist-modify-private user-read-private" | |
| # Global variables to store auth state | |
| access_token = None | |
| user_info = None | |
| auth_state = None | |
| def generate_auth_url(): | |
| """Generate Spotify authorization URL""" | |
| global auth_state | |
| auth_state = secrets.token_urlsafe(32) | |
| params = { | |
| 'client_id': CLIENT_ID, | |
| 'response_type': 'code', | |
| 'redirect_uri': REDIRECT_URI, | |
| 'scope': SCOPE, | |
| 'state': auth_state, | |
| 'show_dialog': 'true' | |
| } | |
| auth_url = f"{SPOTIFY_AUTH_URL}?{urllib.parse.urlencode(params)}" | |
| return f""" | |
| ## π΅ Spotify Authentication Required | |
| To use this app, you need to authenticate with Spotify: | |
| 1. **Click the link below** to authorize this app: | |
| **[π Login with Spotify]({auth_url})** | |
| 2. **After authorization**, you'll be redirected back with a code in the URL | |
| 3. **Copy the entire URL** from your browser's address bar | |
| 4. **Paste it** in the "Authorization URL" field below | |
| 5. **Click "Complete Authentication"** | |
| ### Current Status: β Not Authenticated | |
| """ | |
| def complete_auth(auth_url_input): | |
| """Complete authentication using the callback URL""" | |
| global access_token, user_info, auth_state | |
| if not auth_url_input or not auth_url_input.strip(): | |
| return "β Please paste the callback URL from Spotify", "" | |
| try: | |
| # Parse the URL to extract code and state | |
| parsed_url = urllib.parse.urlparse(auth_url_input) | |
| params = urllib.parse.parse_qs(parsed_url.query) | |
| code = params.get('code', [None])[0] | |
| state = params.get('state', [None])[0] | |
| error = params.get('error', [None])[0] | |
| if error: | |
| return f"β Spotify authentication error: {error}", "" | |
| if not code: | |
| return "β No authorization code found in URL. Please make sure you copied the complete callback URL.", "" | |
| if state != auth_state: | |
| return "β Invalid state parameter. Please try the authentication process again.", "" | |
| # Exchange code for access token | |
| token_data = { | |
| "grant_type": "authorization_code", | |
| "code": code, | |
| "redirect_uri": REDIRECT_URI, | |
| "client_id": CLIENT_ID, | |
| "client_secret": CLIENT_SECRET | |
| } | |
| response = requests.post(SPOTIFY_TOKEN_URL, data=token_data) | |
| token_json = response.json() | |
| if response.status_code == 200: | |
| access_token = token_json.get("access_token") | |
| # Get user info | |
| headers = {"Authorization": f"Bearer {access_token}"} | |
| user_response = requests.get("https://api.spotify.com/v1/me", headers=headers) | |
| if user_response.status_code == 200: | |
| user_info = user_response.json() | |
| success_msg = f"β Successfully authenticated as: **{user_info.get('display_name', 'Unknown User')}**" | |
| return success_msg, success_msg | |
| else: | |
| return "β Authentication successful, but couldn't get user info.", "β Authenticated" | |
| else: | |
| error_msg = token_json.get('error_description', 'Failed to get access token') | |
| return f"β Token exchange failed: {error_msg}", "" | |
| except Exception as e: | |
| return f"β Error processing authentication: {str(e)}", "" | |
| def get_auth_status(): | |
| """Get current authentication status""" | |
| if access_token and user_info: | |
| return f"β Authenticated as: **{user_info.get('display_name', 'Unknown User')}**" | |
| else: | |
| return "β Not authenticated" | |
| def search_songs(query): | |
| """Search for songs on Spotify""" | |
| if not access_token: | |
| return "β Please authenticate first!" | |
| if not query.strip(): | |
| return "Please enter a search query." | |
| headers = {"Authorization": f"Bearer {access_token}"} | |
| params = {"q": query, "type": "track", "limit": 10} | |
| try: | |
| response = requests.get("https://api.spotify.com/v1/search", headers=headers, params=params) | |
| if response.status_code == 200: | |
| data = response.json() | |
| tracks = data.get("tracks", {}).get("items", []) | |
| if not tracks: | |
| return "No tracks found." | |
| result = "π΅ **Search Results:**\n\n" | |
| for i, track in enumerate(tracks, 1): | |
| artists = ", ".join([artist["name"] for artist in track["artists"]]) | |
| result += f"**{i}. {track['name']}** by {artists}\n" | |
| result += f" Album: {track['album']['name']}\n" | |
| result += f" URI: `{track['uri']}`\n" | |
| result += f" Preview: {track.get('preview_url', 'Not available')}\n\n" | |
| return result | |
| else: | |
| error_msg = response.json().get('error', {}).get('message', 'Unknown error') | |
| return f"β Error searching: {error_msg}" | |
| except Exception as e: | |
| return f"β Search error: {str(e)}" | |
| def get_user_playlists(): | |
| """Get user's playlists""" | |
| if not access_token: | |
| return "β Please authenticate first!" | |
| headers = {"Authorization": f"Bearer {access_token}"} | |
| try: | |
| response = requests.get("https://api.spotify.com/v1/me/playlists", headers=headers) | |
| if response.status_code == 200: | |
| data = response.json() | |
| playlists = data.get("items", []) | |
| if not playlists: | |
| return "No playlists found." | |
| result = "π **Your Playlists:**\n\n" | |
| for playlist in playlists: | |
| result += f"**{playlist['name']}**\n" | |
| result += f" ID: `{playlist['id']}`\n" | |
| result += f" Tracks: {playlist['tracks']['total']}\n" | |
| result += f" Public: {'Yes' if playlist['public'] else 'No'}\n\n" | |
| return result | |
| else: | |
| error_msg = response.json().get('error', {}).get('message', 'Unknown error') | |
| return f"β Error getting playlists: {error_msg}" | |
| except Exception as e: | |
| return f"β Playlist error: {str(e)}" | |
| def add_song_to_playlist(playlist_id, track_uri): | |
| """Add a song to a playlist""" | |
| if not access_token: | |
| return "β Please authenticate first!" | |
| if not playlist_id.strip() or not track_uri.strip(): | |
| return "β Please provide both playlist ID and track URI." | |
| # Clean and format track URI | |
| track_uri = track_uri.strip() | |
| if not track_uri.startswith("spotify:track:"): | |
| if "open.spotify.com/track/" in track_uri: | |
| track_id = track_uri.split("/track/")[1].split("?")[0] | |
| track_uri = f"spotify:track:{track_id}" | |
| elif len(track_uri) == 22: # Spotify track ID length | |
| track_uri = f"spotify:track:{track_uri}" | |
| else: | |
| return "β Invalid track URI format. Use spotify:track:... or Spotify URL" | |
| url = f"https://api.spotify.com/v1/playlists/{playlist_id.strip()}/tracks" | |
| headers = { | |
| "Authorization": f"Bearer {access_token}", | |
| "Content-Type": "application/json" | |
| } | |
| data = {"uris": [track_uri]} | |
| try: | |
| response = requests.post(url, headers=headers, json=data) | |
| if response.status_code == 201: | |
| return "β Song added to playlist successfully!" | |
| else: | |
| error_data = response.json().get('error', {}) | |
| error_msg = error_data.get('message', 'Unknown error') | |
| return f"β Error adding song: {error_msg}" | |
| except Exception as e: | |
| return f"β Add song error: {str(e)}" | |
| def create_playlist(playlist_name, description="", public=True): | |
| """Create a new playlist""" | |
| if not access_token: | |
| return "β Please authenticate first!" | |
| if not playlist_name.strip(): | |
| return "β Please provide a playlist name." | |
| if not user_info: | |
| return "β User information not available." | |
| url = f"https://api.spotify.com/v1/users/{user_info['id']}/playlists" | |
| headers = { | |
| "Authorization": f"Bearer {access_token}", | |
| "Content-Type": "application/json" | |
| } | |
| data = { | |
| "name": playlist_name.strip(), | |
| "description": description.strip(), | |
| "public": public | |
| } | |
| try: | |
| response = requests.post(url, headers=headers, json=data) | |
| if response.status_code == 201: | |
| playlist = response.json() | |
| return f"β Playlist created successfully!\n\n**{playlist['name']}**\nID: `{playlist['id']}`\nURL: {playlist['external_urls']['spotify']}" | |
| else: | |
| error_data = response.json().get('error', {}) | |
| error_msg = error_data.get('message', 'Unknown error') | |
| return f"β Error creating playlist: {error_msg}" | |
| except Exception as e: | |
| return f"β Create playlist error: {str(e)}" | |
| # Create Gradio interface | |
| with gr.Blocks(title="π΅ Spotify Playlist Manager", theme=gr.themes.Soft()) as demo: | |
| gr.Markdown("# π΅ Spotify Playlist Manager") | |
| with gr.Tabs(): | |
| # Authentication Tab | |
| with gr.TabItem("π Authentication"): | |
| auth_instructions = gr.Markdown(generate_auth_url()) | |
| with gr.Row(): | |
| auth_url_input = gr.Textbox( | |
| label="Paste the callback URL here after authorizing", | |
| placeholder="https://lenpanda-spotify-mcp.hf.space/?code=...", | |
| lines=2 | |
| ) | |
| with gr.Row(): | |
| complete_auth_btn = gr.Button("Complete Authentication", variant="primary") | |
| refresh_auth_btn = gr.Button("Refresh Auth Status") | |
| auth_status = gr.Markdown("β Not authenticated") | |
| complete_auth_btn.click( | |
| complete_auth, | |
| inputs=auth_url_input, | |
| outputs=[gr.Markdown(), auth_status] | |
| ) | |
| refresh_auth_btn.click( | |
| get_auth_status, | |
| outputs=auth_status | |
| ) | |
| # Search Tab | |
| with gr.TabItem("π Search Songs"): | |
| gr.Markdown("Search for songs on Spotify") | |
| with gr.Row(): | |
| search_input = gr.Textbox( | |
| label="Search for songs", | |
| placeholder="Enter song name, artist, or album" | |
| ) | |
| search_btn = gr.Button("Search", variant="primary") | |
| search_output = gr.Markdown("Enter a search query above") | |
| search_btn.click(search_songs, inputs=search_input, outputs=search_output) | |
| # Playlists Tab | |
| with gr.TabItem("π Playlists"): | |
| gr.Markdown("View and manage your playlists") | |
| with gr.Row(): | |
| get_playlists_btn = gr.Button("Get My Playlists", variant="primary") | |
| playlists_output = gr.Markdown("Click the button above to load your playlists") | |
| get_playlists_btn.click(get_user_playlists, outputs=playlists_output) | |
| # Add Song Tab | |
| with gr.TabItem("β Add Song"): | |
| gr.Markdown("Add a song to one of your playlists") | |
| playlist_id_input = gr.Textbox( | |
| label="Playlist ID", | |
| placeholder="Get this from the Playlists tab" | |
| ) | |
| track_uri_input = gr.Textbox( | |
| label="Track URI or Spotify URL", | |
| placeholder="spotify:track:... or https://open.spotify.com/track/..." | |
| ) | |
| add_song_btn = gr.Button("Add Song to Playlist", variant="primary") | |
| add_song_output = gr.Markdown("") | |
| add_song_btn.click( | |
| add_song_to_playlist, | |
| inputs=[playlist_id_input, track_uri_input], | |
| outputs=add_song_output | |
| ) | |
| # Create Playlist Tab | |
| with gr.TabItem("β Create Playlist"): | |
| gr.Markdown("Create a new playlist") | |
| playlist_name_input = gr.Textbox( | |
| label="Playlist Name", | |
| placeholder="My Awesome Playlist" | |
| ) | |
| playlist_desc_input = gr.Textbox( | |
| label="Description (optional)", | |
| placeholder="A description for your playlist" | |
| ) | |
| playlist_public = gr.Checkbox(label="Make playlist public", value=True) | |
| create_playlist_btn = gr.Button("Create Playlist", variant="primary") | |
| create_playlist_output = gr.Markdown("") | |
| create_playlist_btn.click( | |
| create_playlist, | |
| inputs=[playlist_name_input, playlist_desc_input, playlist_public], | |
| outputs=create_playlist_output | |
| ) | |
| if __name__ == "__main__": | |
| demo.launch() |