import base64 import json import os import secrets import time from pathlib import Path from typing import Any from urllib.parse import urlencode import httpx from dotenv import load_dotenv from fastapi import FastAPI, HTTPException, Query from fastapi.responses import HTMLResponse, RedirectResponse from mcp.server.fastmcp import FastMCP load_dotenv() SPOTIFY_ACCOUNTS_BASE = "https://accounts.spotify.com" SPOTIFY_API_BASE = "https://api.spotify.com/v1" SPOTIFY_CLIENT_ID = os.getenv("SPOTIFY_CLIENT_ID", "").strip() SPOTIFY_CLIENT_SECRET = os.getenv("SPOTIFY_CLIENT_SECRET", "").strip() SPOTIFY_REDIRECT_URI = os.getenv("SPOTIFY_REDIRECT_URI", "").strip() _SPOTIFY_SCOPES_RAW = os.getenv( "SPOTIFY_SCOPES", "user-read-private user-read-email playlist-read-private " "playlist-read-collaborative playlist-modify-private playlist-modify-public " "user-library-read user-read-recently-played", ).strip() _REQUIRED_SCOPES = [ "user-read-private", "user-read-email", "playlist-read-private", "playlist-read-collaborative", "playlist-modify-private", "playlist-modify-public", "user-library-read", "user-library-modify", "user-read-recently-played", "user-top-read", ] _scope_parts = _SPOTIFY_SCOPES_RAW.split() for _required_scope in _REQUIRED_SCOPES: if _required_scope not in _scope_parts: _scope_parts.append(_required_scope) SPOTIFY_SCOPES = " ".join(_scope_parts) SPOTIFY_TOKEN_FILE = Path(os.getenv("SPOTIFY_TOKEN_FILE", "spotify_tokens.json")).resolve() MCP_SSE_URL = os.getenv( "MCP_PUBLIC_SSE_URL", "https://pharmaia-demo-mcp-server-spotify.hf.space/gradio_api/mcp/sse", ).strip() # Keep OAuth states in memory for CSRF protection. OAUTH_STATE_TTL_SECONDS = 600 _oauth_states: dict[str, int] = {} class SpotifyAuthError(RuntimeError): pass def _require_spotify_config() -> None: missing = [] if not SPOTIFY_CLIENT_ID: missing.append("SPOTIFY_CLIENT_ID") if not SPOTIFY_CLIENT_SECRET: missing.append("SPOTIFY_CLIENT_SECRET") if not SPOTIFY_REDIRECT_URI: missing.append("SPOTIFY_REDIRECT_URI") if missing: raise SpotifyAuthError( "Missing required environment variables: " + ", ".join(missing) ) def _cleanup_expired_states() -> None: now = int(time.time()) expired = [k for k, ts in _oauth_states.items() if (now - ts) > OAUTH_STATE_TTL_SECONDS] for key in expired: _oauth_states.pop(key, None) def _new_oauth_state() -> str: _cleanup_expired_states() state = secrets.token_urlsafe(24) _oauth_states[state] = int(time.time()) return state def _consume_oauth_state(state: str) -> bool: _cleanup_expired_states() ts = _oauth_states.pop(state, None) if ts is None: return False return (int(time.time()) - ts) <= OAUTH_STATE_TTL_SECONDS def _spotify_basic_auth_header() -> str: token = f"{SPOTIFY_CLIENT_ID}:{SPOTIFY_CLIENT_SECRET}".encode("utf-8") return base64.b64encode(token).decode("utf-8") def _load_tokens() -> dict[str, Any] | None: if not SPOTIFY_TOKEN_FILE.exists(): return None try: return json.loads(SPOTIFY_TOKEN_FILE.read_text(encoding="utf-8")) except (json.JSONDecodeError, OSError): return None def _save_tokens(token_payload: dict[str, Any], previous: dict[str, Any] | None = None) -> dict[str, Any]: now = int(time.time()) expires_in = int(token_payload.get("expires_in", 3600)) merged: dict[str, Any] = {} if previous: merged.update(previous) merged.update(token_payload) if not merged.get("refresh_token") and previous: merged["refresh_token"] = previous.get("refresh_token") merged["saved_at"] = now merged["expires_at"] = now + expires_in SPOTIFY_TOKEN_FILE.write_text( json.dumps(merged, indent=2, ensure_ascii=True), encoding="utf-8", ) return merged def _is_expired(tokens: dict[str, Any]) -> bool: expires_at = int(tokens.get("expires_at", 0)) return int(time.time()) >= (expires_at - 30) def _exchange_code_for_tokens(code: str) -> dict[str, Any]: _require_spotify_config() data = { "grant_type": "authorization_code", "code": code, "redirect_uri": SPOTIFY_REDIRECT_URI, } headers = { "Authorization": f"Basic {_spotify_basic_auth_header()}", "Content-Type": "application/x-www-form-urlencoded", } with httpx.Client(timeout=20.0) as client: response = client.post( f"{SPOTIFY_ACCOUNTS_BASE}/api/token", data=data, headers=headers, ) if response.status_code != 200: try: payload = response.json() except ValueError: payload = {"error": response.text} raise SpotifyAuthError(f"Failed token exchange: {payload}") return _save_tokens(response.json()) def _refresh_access_token(tokens: dict[str, Any]) -> dict[str, Any]: refresh_token = tokens.get("refresh_token") if not refresh_token: raise SpotifyAuthError( "No refresh token found. Re-authenticate at /auth/login." ) data = { "grant_type": "refresh_token", "refresh_token": refresh_token, } headers = { "Authorization": f"Basic {_spotify_basic_auth_header()}", "Content-Type": "application/x-www-form-urlencoded", } with httpx.Client(timeout=20.0) as client: response = client.post( f"{SPOTIFY_ACCOUNTS_BASE}/api/token", data=data, headers=headers, ) if response.status_code != 200: try: payload = response.json() except ValueError: payload = {"error": response.text} raise SpotifyAuthError(f"Failed token refresh: {payload}") return _save_tokens(response.json(), previous=tokens) def _get_valid_access_token() -> str: _require_spotify_config() tokens = _load_tokens() if not tokens: raise SpotifyAuthError( "Spotify is not authenticated yet. Open /auth/login first." ) if _is_expired(tokens): tokens = _refresh_access_token(tokens) access_token = tokens.get("access_token") if not access_token: raise SpotifyAuthError( "Access token missing. Re-authenticate at /auth/login." ) return str(access_token) def _spotify_request( method: str, path: str, params: dict[str, Any] | None = None, json_body: dict[str, Any] | None = None, ) -> dict[str, Any]: token = _get_valid_access_token() headers = {"Authorization": f"Bearer {token}"} with httpx.Client(timeout=20.0) as client: response = client.request( method, f"{SPOTIFY_API_BASE}{path}", params=params, json=json_body, headers=headers, ) if response.status_code == 401: refreshed = _refresh_access_token(_load_tokens() or {}) headers["Authorization"] = f"Bearer {refreshed.get('access_token', '')}" response = client.request( method, f"{SPOTIFY_API_BASE}{path}", params=params, json=json_body, headers=headers, ) if response.status_code >= 400: try: payload = response.json() except ValueError: payload = {"error": response.text} if response.status_code == 403: error_message = "" if isinstance(payload, dict): err = payload.get("error") if isinstance(err, dict): error_message = str(err.get("message", "")).strip() elif isinstance(err, str): error_message = err.strip() if "Insufficient client scope" in error_message: raise SpotifyAuthError( "Spotify token without required scopes. Re-authenticate at " "/auth/reset then /auth/login (force consent), and confirm " "SPOTIFY_SCOPES includes playlist-modify-private and " "playlist-modify-public." ) raise SpotifyAuthError(f"Spotify API error {response.status_code}: {payload}") if not response.content: return {} try: return response.json() except ValueError: return {"raw": response.text} def _build_auth_url(state: str, show_dialog: bool = True) -> str: _require_spotify_config() params = { "response_type": "code", "client_id": SPOTIFY_CLIENT_ID, "scope": SPOTIFY_SCOPES, "redirect_uri": SPOTIFY_REDIRECT_URI, "state": state, "show_dialog": "true" if show_dialog else "false", } return f"{SPOTIFY_ACCOUNTS_BASE}/authorize?{urlencode(params)}" def _token_status() -> dict[str, Any]: tokens = _load_tokens() or {} authenticated = bool(tokens.get("access_token")) return { "authenticated": authenticated, "token_file": str(SPOTIFY_TOKEN_FILE), "expires_at": tokens.get("expires_at"), "scopes": tokens.get("scope"), } def _token_scope_set() -> set[str]: tokens = _load_tokens() or {} scope_str = str(tokens.get("scope", "")).strip() return {scope for scope in scope_str.split() if scope} def _require_any_scope(required_scopes: list[str], action: str) -> None: token_scopes = _token_scope_set() if not token_scopes: return if any(scope in token_scopes for scope in required_scopes): return raise SpotifyAuthError( f"Missing scope for {action}. Need one of: {', '.join(required_scopes)}. " "Run /auth/reset and /auth/login to re-authorize with updated scopes." ) def _short_playlist(playlist: dict[str, Any]) -> dict[str, Any]: return { "id": playlist.get("id"), "name": playlist.get("name"), "uri": playlist.get("uri"), "public": playlist.get("public"), "owner": (playlist.get("owner") or {}).get("id"), "total_items": (playlist.get("tracks") or {}).get("total"), "external_url": (playlist.get("external_urls") or {}).get("spotify"), } def _short_track(track: dict[str, Any]) -> dict[str, Any]: return { "id": track.get("id"), "name": track.get("name"), "uri": track.get("uri"), "artists": [a.get("name") for a in track.get("artists", [])], "album": (track.get("album") or {}).get("name"), "external_url": (track.get("external_urls") or {}).get("spotify"), } def _normalize_track_id(track_ref: str) -> str: value = track_ref.strip() if not value: return "" if value.startswith("spotify:track:"): return value.split(":")[-1].strip() if "open.spotify.com/track/" in value: return value.split("open.spotify.com/track/")[-1].split("?")[0].strip().strip("/") return value def _normalize_track_ids(track_ids: list[str], max_items: int = 500) -> list[str]: normalized: list[str] = [] seen: set[str] = set() for raw in track_ids: item = _normalize_track_id(raw) if item and item not in seen: normalized.append(item) seen.add(item) if not normalized: raise SpotifyAuthError("Provide at least one valid track id or track uri.") if len(normalized) > max_items: raise SpotifyAuthError(f"Too many track ids. Maximum allowed is {max_items}.") return normalized def _normalize_playlist_id(playlist_ref: str) -> str: value = playlist_ref.strip() if not value: return "" if value.startswith("spotify:playlist:"): return value.split(":")[-1].strip() if "open.spotify.com/playlist/" in value: return value.split("open.spotify.com/playlist/")[-1].split("?")[0].strip().strip("/") return value def _chunked(items: list[str], size: int) -> list[list[str]]: return [items[i : i + size] for i in range(0, len(items), size)] mcp = FastMCP( name="Spotify MCP Server", instructions=( "MCP server to query Spotify profile, search tracks, top tracks, library, " "listening history, and manage playlists. " "If not authenticated, user must open /auth/login first." ), ) @mcp.tool() def spotify_auth_status() -> dict[str, Any]: """Return whether the server already has a valid Spotify token.""" return _token_status() @mcp.tool() def spotify_get_auth_url() -> dict[str, str]: """Generate the Spotify OAuth URL. Open it in a browser to connect the account.""" state = _new_oauth_state() return { "auth_url": _build_auth_url(state), "callback": SPOTIFY_REDIRECT_URI, "note": "Open auth_url in browser and finish login/consent.", } @mcp.tool() def spotify_get_my_profile() -> dict[str, Any]: """Get current Spotify user profile (/me).""" me = _spotify_request("GET", "/me") return { "id": me.get("id"), "display_name": me.get("display_name"), "uri": me.get("uri"), "country": me.get("country"), "product": me.get("product"), } @mcp.tool() def spotify_search_tracks(query: str, limit: int = 5, offset: int = 0) -> dict[str, Any]: """Search tracks using Spotify /search endpoint.""" safe_limit = max(1, min(limit, 10)) safe_offset = max(0, offset) payload = _spotify_request( "GET", "/search", params={ "q": query, "type": "track", "limit": safe_limit, "offset": safe_offset, }, ) tracks = payload.get("tracks") or {} items = [_short_track(t) for t in tracks.get("items", [])] return { "query": query, "limit": safe_limit, "offset": safe_offset, "total": tracks.get("total", 0), "items": items, } @mcp.tool() def spotify_list_saved_tracks(limit: int = 20, offset: int = 0) -> dict[str, Any]: """List user's saved (liked) tracks from library (/me/tracks).""" safe_limit = max(1, min(limit, 50)) safe_offset = max(0, offset) payload = _spotify_request( "GET", "/me/tracks", params={"limit": safe_limit, "offset": safe_offset}, ) items: list[dict[str, Any]] = [] for item in payload.get("items", []): track = _short_track(item.get("track") or {}) track["added_at"] = item.get("added_at") items.append(track) return { "limit": safe_limit, "offset": safe_offset, "total": payload.get("total", 0), "items": items, } @mcp.tool() def spotify_list_recently_played(limit: int = 20) -> dict[str, Any]: """List recently played tracks (/me/player/recently-played).""" safe_limit = max(1, min(limit, 50)) payload = _spotify_request( "GET", "/me/player/recently-played", params={"limit": safe_limit}, ) items: list[dict[str, Any]] = [] for item in payload.get("items", []): track = _short_track(item.get("track") or {}) track["played_at"] = item.get("played_at") context = item.get("context") or {} track["context_type"] = context.get("type") track["context_uri"] = context.get("uri") items.append(track) return { "limit": safe_limit, "items": items, "cursors": payload.get("cursors"), } @mcp.tool() def spotify_get_top_tracks( time_range: str = "medium_term", limit: int = 20, offset: int = 0, ) -> dict[str, Any]: """Get user top tracks (/me/top/tracks).""" allowed_ranges = {"short_term", "medium_term", "long_term"} safe_range = time_range if time_range in allowed_ranges else "medium_term" safe_limit = max(1, min(limit, 50)) safe_offset = max(0, offset) payload = _spotify_request( "GET", "/me/top/tracks", params={ "time_range": safe_range, "limit": safe_limit, "offset": safe_offset, }, ) return { "time_range": safe_range, "limit": safe_limit, "offset": safe_offset, "total": payload.get("total", 0), "items": [_short_track(t) for t in payload.get("items", [])], } @mcp.tool() def spotify_check_saved_tracks(track_ids: list[str]) -> dict[str, Any]: """Check if tracks are saved in user's library (/me/tracks/contains).""" ids = _normalize_track_ids(track_ids) chunks = _chunked(ids, 50) states: list[bool] = [] for chunk in chunks: payload = _spotify_request( "GET", "/me/tracks/contains", params={"ids": ",".join(chunk)}, ) if isinstance(payload, list): states.extend(bool(x) for x in payload) else: raise SpotifyAuthError("Unexpected response for /me/tracks/contains.") items = [{"id": track_id, "saved": states[idx]} for idx, track_id in enumerate(ids)] return {"total": len(ids), "items": items} @mcp.tool() def spotify_save_tracks(track_ids: list[str]) -> dict[str, Any]: """Save tracks to user library (PUT /me/tracks).""" ids = _normalize_track_ids(track_ids) chunks = _chunked(ids, 50) for chunk in chunks: _spotify_request( "PUT", "/me/tracks", params={"ids": ",".join(chunk)}, ) return {"saved": len(ids), "track_ids": ids} @mcp.tool() def spotify_remove_saved_tracks(track_ids: list[str]) -> dict[str, Any]: """Remove tracks from user library (DELETE /me/tracks).""" ids = _normalize_track_ids(track_ids) chunks = _chunked(ids, 50) for chunk in chunks: _spotify_request( "DELETE", "/me/tracks", params={"ids": ",".join(chunk)}, ) return {"removed": len(ids), "track_ids": ids} @mcp.tool() def spotify_set_tracks_saved(track_ids: list[str], saved: bool) -> dict[str, Any]: """Set saved state for tracks in user library (alta/baja).""" if saved: result = spotify_save_tracks(track_ids) result["action"] = "saved" return result result = spotify_remove_saved_tracks(track_ids) result["action"] = "removed" return result @mcp.tool() def spotify_list_my_playlists(limit: int = 10, offset: int = 0) -> dict[str, Any]: """List playlists from current user (/me/playlists).""" safe_limit = max(1, min(limit, 50)) safe_offset = max(0, offset) payload = _spotify_request( "GET", "/me/playlists", params={"limit": safe_limit, "offset": safe_offset}, ) return { "limit": safe_limit, "offset": safe_offset, "total": payload.get("total", 0), "items": [_short_playlist(p) for p in payload.get("items", [])], } @mcp.tool() def spotify_create_playlist( name: str, description: str = "", public: bool = False, ) -> dict[str, Any]: """Create a playlist in the current user's account.""" if not name.strip(): raise SpotifyAuthError("Playlist name cannot be empty.") required_scope = "playlist-modify-public" if public else "playlist-modify-private" _require_any_scope([required_scope], "create playlist") payload = _spotify_request( "POST", "/me/playlists", json_body={ "name": name.strip(), "description": description, "public": public, }, ) return { "id": payload.get("id"), "name": payload.get("name"), "uri": payload.get("uri"), "public": payload.get("public"), "external_url": (payload.get("external_urls") or {}).get("spotify"), } @mcp.tool() def spotify_add_items_to_playlist(playlist_id: str, uris: list[str]) -> dict[str, Any]: """Add Spotify item URIs to a playlist (/playlists/{id}/items).""" _require_any_scope( ["playlist-modify-private", "playlist-modify-public"], "add items to playlist", ) clean_uris = [u.strip() for u in uris if u and u.strip()] if not playlist_id.strip(): raise SpotifyAuthError("playlist_id is required.") if not clean_uris: raise SpotifyAuthError("Provide at least one URI.") if len(clean_uris) > 100: raise SpotifyAuthError("Spotify allows up to 100 URIs per request.") payload = _spotify_request( "POST", f"/playlists/{_normalize_playlist_id(playlist_id)}/items", json_body={"uris": clean_uris}, ) return { "playlist_id": _normalize_playlist_id(playlist_id), "added": len(clean_uris), "snapshot_id": payload.get("snapshot_id"), } @mcp.tool() def spotify_update_playlist_details( playlist_id: str, name: str | None = None, description: str | None = None, public: bool | None = None, collaborative: bool | None = None, ) -> dict[str, Any]: """Update playlist metadata (PUT /playlists/{id}).""" _require_any_scope( ["playlist-modify-private", "playlist-modify-public"], "update playlist details", ) pid = _normalize_playlist_id(playlist_id) if not pid: raise SpotifyAuthError("playlist_id is required.") body: dict[str, Any] = {} if name is not None: body["name"] = name.strip() if description is not None: body["description"] = description if public is not None: body["public"] = public if collaborative is not None: body["collaborative"] = collaborative if not body: raise SpotifyAuthError("Provide at least one field to update.") # Spotify collaborative playlists must be private. if body.get("collaborative") is True and body.get("public") is True: raise SpotifyAuthError("Collaborative playlists cannot be public.") if body.get("collaborative") is True and "public" not in body: body["public"] = False _spotify_request( "PUT", f"/playlists/{pid}", json_body=body, ) return { "playlist_id": pid, "updated_fields": list(body.keys()), } @mcp.tool() def spotify_replace_playlist_items(playlist_id: str, uris: list[str]) -> dict[str, Any]: """Replace all items in playlist (PUT /playlists/{id}/tracks).""" _require_any_scope( ["playlist-modify-private", "playlist-modify-public"], "replace playlist items", ) pid = _normalize_playlist_id(playlist_id) if not pid: raise SpotifyAuthError("playlist_id is required.") clean_uris = [u.strip() for u in uris if u and u.strip()] if not clean_uris: raise SpotifyAuthError("Provide at least one URI.") if len(clean_uris) > 100: raise SpotifyAuthError("Spotify allows up to 100 URIs per request.") payload = _spotify_request( "PUT", f"/playlists/{pid}/tracks", json_body={"uris": clean_uris}, ) return { "playlist_id": pid, "replaced_with": len(clean_uris), "snapshot_id": payload.get("snapshot_id"), } @mcp.tool() def spotify_delete_playlist(playlist_id: str) -> dict[str, Any]: """Delete playlist from library (DELETE /playlists/{id}/followers).""" _require_any_scope( ["playlist-modify-private", "playlist-modify-public"], "delete playlist", ) pid = _normalize_playlist_id(playlist_id) if not pid: raise SpotifyAuthError("playlist_id is required.") _spotify_request( "DELETE", f"/playlists/{pid}/followers", ) return { "playlist_id": pid, "deleted": True, "note": "Spotify performs unfollow; own playlists are removed from your profile.", } app = FastAPI(title="Spotify MCP Server", version="1.0.0") app.mount("/gradio_api/mcp", mcp.sse_app("/gradio_api/mcp")) @app.get("/") def root() -> dict[str, Any]: return { "service": "spotify-mcp-server", "auth_login_url": "/auth/login", "auth_reset_url": "/auth/reset", "auth_status_url": "/auth/status", "mcp_sse_path": "/gradio_api/mcp/sse", "public_mcp_sse_url": MCP_SSE_URL, } @app.get("/health") def health() -> dict[str, str]: return {"status": "ok"} @app.get("/auth/status") def auth_status() -> dict[str, Any]: return _token_status() @app.get("/auth/reset") def auth_reset() -> dict[str, Any]: removed = False if SPOTIFY_TOKEN_FILE.exists(): try: SPOTIFY_TOKEN_FILE.unlink(missing_ok=True) removed = True except OSError as exc: raise HTTPException( status_code=500, detail=f"Could not remove token file: {exc}", ) from exc return {"reset": True, "removed_token_file": removed} @app.get("/auth/login") def auth_login(force: bool = Query(default=True)) -> RedirectResponse: try: state = _new_oauth_state() url = _build_auth_url(state, show_dialog=force) except SpotifyAuthError as exc: raise HTTPException(status_code=500, detail=str(exc)) from exc return RedirectResponse(url=url, status_code=307) @app.get("/auth/callback", response_class=HTMLResponse) def auth_callback( code: str | None = Query(default=None), state: str | None = Query(default=None), error: str | None = Query(default=None), ) -> HTMLResponse: if error: raise HTTPException(status_code=400, detail=f"Spotify authorization error: {error}") if not code or not state: raise HTTPException(status_code=400, detail="Missing code/state in callback.") if not _consume_oauth_state(state): raise HTTPException(status_code=400, detail="Invalid or expired OAuth state.") try: _exchange_code_for_tokens(code) except SpotifyAuthError as exc: raise HTTPException(status_code=400, detail=str(exc)) from exc html = """
Ya puedes volver a tu cliente MCP y usar las tools.
""" return HTMLResponse(content=html, status_code=200) if __name__ == "__main__": import uvicorn port = int(os.getenv("PORT", "7860")) uvicorn.run("app:app", host="0.0.0.0", port=port)