import base64 import json import os import secrets import time from pathlib import Path from typing import Any from urllib.parse import urlencode import httpx from dotenv import load_dotenv from fastapi import FastAPI, HTTPException, Query from fastapi.responses import HTMLResponse, RedirectResponse from mcp.server.fastmcp import FastMCP load_dotenv() SPOTIFY_ACCOUNTS_BASE = "https://accounts.spotify.com" SPOTIFY_API_BASE = "https://api.spotify.com/v1" SPOTIFY_CLIENT_ID = os.getenv("SPOTIFY_CLIENT_ID", "").strip() SPOTIFY_CLIENT_SECRET = os.getenv("SPOTIFY_CLIENT_SECRET", "").strip() SPOTIFY_REDIRECT_URI = os.getenv("SPOTIFY_REDIRECT_URI", "").strip() SPOTIFY_SCOPES = os.getenv( "SPOTIFY_SCOPES", "user-read-private user-read-email playlist-read-private " "playlist-read-collaborative playlist-modify-private playlist-modify-public", ).strip() SPOTIFY_TOKEN_FILE = Path(os.getenv("SPOTIFY_TOKEN_FILE", "spotify_tokens.json")).resolve() MCP_SSE_URL = os.getenv( "MCP_PUBLIC_SSE_URL", "https://pharmaia-demo-mcp-server-spotify.hf.space/gradio_api/mcp/sse", ).strip() # Keep OAuth states in memory for CSRF protection. OAUTH_STATE_TTL_SECONDS = 600 _oauth_states: dict[str, int] = {} class SpotifyAuthError(RuntimeError): pass def _require_spotify_config() -> None: missing = [] if not SPOTIFY_CLIENT_ID: missing.append("SPOTIFY_CLIENT_ID") if not SPOTIFY_CLIENT_SECRET: missing.append("SPOTIFY_CLIENT_SECRET") if not SPOTIFY_REDIRECT_URI: missing.append("SPOTIFY_REDIRECT_URI") if missing: raise SpotifyAuthError( "Missing required environment variables: " + ", ".join(missing) ) def _cleanup_expired_states() -> None: now = int(time.time()) expired = [k for k, ts in _oauth_states.items() if (now - ts) > OAUTH_STATE_TTL_SECONDS] for key in expired: _oauth_states.pop(key, None) def _new_oauth_state() -> str: _cleanup_expired_states() state = secrets.token_urlsafe(24) _oauth_states[state] = int(time.time()) return state def _consume_oauth_state(state: str) -> bool: _cleanup_expired_states() ts = _oauth_states.pop(state, None) if ts is None: return False return (int(time.time()) - ts) <= OAUTH_STATE_TTL_SECONDS def _spotify_basic_auth_header() -> str: token = f"{SPOTIFY_CLIENT_ID}:{SPOTIFY_CLIENT_SECRET}".encode("utf-8") return base64.b64encode(token).decode("utf-8") def _load_tokens() -> dict[str, Any] | None: if not SPOTIFY_TOKEN_FILE.exists(): return None try: return json.loads(SPOTIFY_TOKEN_FILE.read_text(encoding="utf-8")) except (json.JSONDecodeError, OSError): return None def _save_tokens(token_payload: dict[str, Any], previous: dict[str, Any] | None = None) -> dict[str, Any]: now = int(time.time()) expires_in = int(token_payload.get("expires_in", 3600)) merged: dict[str, Any] = {} if previous: merged.update(previous) merged.update(token_payload) if not merged.get("refresh_token") and previous: merged["refresh_token"] = previous.get("refresh_token") merged["saved_at"] = now merged["expires_at"] = now + expires_in SPOTIFY_TOKEN_FILE.write_text( json.dumps(merged, indent=2, ensure_ascii=True), encoding="utf-8", ) return merged def _is_expired(tokens: dict[str, Any]) -> bool: expires_at = int(tokens.get("expires_at", 0)) return int(time.time()) >= (expires_at - 30) def _exchange_code_for_tokens(code: str) -> dict[str, Any]: _require_spotify_config() data = { "grant_type": "authorization_code", "code": code, "redirect_uri": SPOTIFY_REDIRECT_URI, } headers = { "Authorization": f"Basic {_spotify_basic_auth_header()}", "Content-Type": "application/x-www-form-urlencoded", } with httpx.Client(timeout=20.0) as client: response = client.post( f"{SPOTIFY_ACCOUNTS_BASE}/api/token", data=data, headers=headers, ) if response.status_code != 200: try: payload = response.json() except ValueError: payload = {"error": response.text} raise SpotifyAuthError(f"Failed token exchange: {payload}") return _save_tokens(response.json()) def _refresh_access_token(tokens: dict[str, Any]) -> dict[str, Any]: refresh_token = tokens.get("refresh_token") if not refresh_token: raise SpotifyAuthError( "No refresh token found. Re-authenticate at /auth/login." ) data = { "grant_type": "refresh_token", "refresh_token": refresh_token, } headers = { "Authorization": f"Basic {_spotify_basic_auth_header()}", "Content-Type": "application/x-www-form-urlencoded", } with httpx.Client(timeout=20.0) as client: response = client.post( f"{SPOTIFY_ACCOUNTS_BASE}/api/token", data=data, headers=headers, ) if response.status_code != 200: try: payload = response.json() except ValueError: payload = {"error": response.text} raise SpotifyAuthError(f"Failed token refresh: {payload}") return _save_tokens(response.json(), previous=tokens) def _get_valid_access_token() -> str: _require_spotify_config() tokens = _load_tokens() if not tokens: raise SpotifyAuthError( "Spotify is not authenticated yet. Open /auth/login first." ) if _is_expired(tokens): tokens = _refresh_access_token(tokens) access_token = tokens.get("access_token") if not access_token: raise SpotifyAuthError( "Access token missing. Re-authenticate at /auth/login." ) return str(access_token) def _spotify_request( method: str, path: str, params: dict[str, Any] | None = None, json_body: dict[str, Any] | None = None, ) -> dict[str, Any]: token = _get_valid_access_token() headers = {"Authorization": f"Bearer {token}"} with httpx.Client(timeout=20.0) as client: response = client.request( method, f"{SPOTIFY_API_BASE}{path}", params=params, json=json_body, headers=headers, ) if response.status_code == 401: refreshed = _refresh_access_token(_load_tokens() or {}) headers["Authorization"] = f"Bearer {refreshed.get('access_token', '')}" response = client.request( method, f"{SPOTIFY_API_BASE}{path}", params=params, json=json_body, headers=headers, ) if response.status_code >= 400: try: payload = response.json() except ValueError: payload = {"error": response.text} raise SpotifyAuthError(f"Spotify API error {response.status_code}: {payload}") if not response.content: return {} try: return response.json() except ValueError: return {"raw": response.text} def _build_auth_url(state: str) -> str: _require_spotify_config() params = { "response_type": "code", "client_id": SPOTIFY_CLIENT_ID, "scope": SPOTIFY_SCOPES, "redirect_uri": SPOTIFY_REDIRECT_URI, "state": state, "show_dialog": "false", } return f"{SPOTIFY_ACCOUNTS_BASE}/authorize?{urlencode(params)}" def _token_status() -> dict[str, Any]: tokens = _load_tokens() or {} authenticated = bool(tokens.get("access_token")) return { "authenticated": authenticated, "token_file": str(SPOTIFY_TOKEN_FILE), "expires_at": tokens.get("expires_at"), "scopes": tokens.get("scope"), } def _short_playlist(playlist: dict[str, Any]) -> dict[str, Any]: return { "id": playlist.get("id"), "name": playlist.get("name"), "uri": playlist.get("uri"), "public": playlist.get("public"), "owner": (playlist.get("owner") or {}).get("id"), "total_items": (playlist.get("tracks") or {}).get("total"), "external_url": (playlist.get("external_urls") or {}).get("spotify"), } def _short_track(track: dict[str, Any]) -> dict[str, Any]: return { "id": track.get("id"), "name": track.get("name"), "uri": track.get("uri"), "artists": [a.get("name") for a in track.get("artists", [])], "album": (track.get("album") or {}).get("name"), "external_url": (track.get("external_urls") or {}).get("spotify"), } mcp = FastMCP( name="Spotify MCP Server", instructions=( "MCP server to query Spotify profile, search tracks, and manage playlists. " "If not authenticated, user must open /auth/login first." ), ) @mcp.tool() def spotify_auth_status() -> dict[str, Any]: """Return whether the server already has a valid Spotify token.""" return _token_status() @mcp.tool() def spotify_get_auth_url() -> dict[str, str]: """Generate the Spotify OAuth URL. Open it in a browser to connect the account.""" state = _new_oauth_state() return { "auth_url": _build_auth_url(state), "callback": SPOTIFY_REDIRECT_URI, "note": "Open auth_url in browser and finish login/consent.", } @mcp.tool() def spotify_get_my_profile() -> dict[str, Any]: """Get current Spotify user profile (/me).""" me = _spotify_request("GET", "/me") return { "id": me.get("id"), "display_name": me.get("display_name"), "uri": me.get("uri"), "country": me.get("country"), "product": me.get("product"), } @mcp.tool() def spotify_search_tracks(query: str, limit: int = 5, offset: int = 0) -> dict[str, Any]: """Search tracks using Spotify /search endpoint.""" safe_limit = max(1, min(limit, 10)) safe_offset = max(0, offset) payload = _spotify_request( "GET", "/search", params={ "q": query, "type": "track", "limit": safe_limit, "offset": safe_offset, }, ) tracks = payload.get("tracks") or {} items = [_short_track(t) for t in tracks.get("items", [])] return { "query": query, "limit": safe_limit, "offset": safe_offset, "total": tracks.get("total", 0), "items": items, } @mcp.tool() def spotify_list_my_playlists(limit: int = 10, offset: int = 0) -> dict[str, Any]: """List playlists from current user (/me/playlists).""" safe_limit = max(1, min(limit, 50)) safe_offset = max(0, offset) payload = _spotify_request( "GET", "/me/playlists", params={"limit": safe_limit, "offset": safe_offset}, ) return { "limit": safe_limit, "offset": safe_offset, "total": payload.get("total", 0), "items": [_short_playlist(p) for p in payload.get("items", [])], } @mcp.tool() def spotify_create_playlist( name: str, description: str = "", public: bool = False, ) -> dict[str, Any]: """Create a playlist in the current user's account.""" if not name.strip(): raise SpotifyAuthError("Playlist name cannot be empty.") me = _spotify_request("GET", "/me") user_id = str(me.get("id", "")).strip() if not user_id: raise SpotifyAuthError("Could not resolve current Spotify user id.") payload = _spotify_request( "POST", f"/users/{user_id}/playlists", json_body={ "name": name.strip(), "description": description, "public": public, }, ) return { "id": payload.get("id"), "name": payload.get("name"), "uri": payload.get("uri"), "public": payload.get("public"), "external_url": (payload.get("external_urls") or {}).get("spotify"), } @mcp.tool() def spotify_add_items_to_playlist(playlist_id: str, uris: list[str]) -> dict[str, Any]: """Add Spotify item URIs to a playlist (/playlists/{id}/items).""" clean_uris = [u.strip() for u in uris if u and u.strip()] if not playlist_id.strip(): raise SpotifyAuthError("playlist_id is required.") if not clean_uris: raise SpotifyAuthError("Provide at least one URI.") if len(clean_uris) > 100: raise SpotifyAuthError("Spotify allows up to 100 URIs per request.") payload = _spotify_request( "POST", f"/playlists/{playlist_id.strip()}/items", json_body={"uris": clean_uris}, ) return { "playlist_id": playlist_id.strip(), "added": len(clean_uris), "snapshot_id": payload.get("snapshot_id"), } app = FastAPI(title="Spotify MCP Server", version="1.0.0") app.mount("/gradio_api/mcp", mcp.sse_app("/gradio_api/mcp")) @app.get("/") def root() -> dict[str, Any]: return { "service": "spotify-mcp-server", "auth_login_url": "/auth/login", "auth_status_url": "/auth/status", "mcp_sse_path": "/gradio_api/mcp/sse", "public_mcp_sse_url": MCP_SSE_URL, } @app.get("/health") def health() -> dict[str, str]: return {"status": "ok"} @app.get("/auth/status") def auth_status() -> dict[str, Any]: return _token_status() @app.get("/auth/login") def auth_login() -> RedirectResponse: try: state = _new_oauth_state() url = _build_auth_url(state) except SpotifyAuthError as exc: raise HTTPException(status_code=500, detail=str(exc)) from exc return RedirectResponse(url=url, status_code=307) @app.get("/auth/callback", response_class=HTMLResponse) def auth_callback( code: str | None = Query(default=None), state: str | None = Query(default=None), error: str | None = Query(default=None), ) -> HTMLResponse: if error: raise HTTPException(status_code=400, detail=f"Spotify authorization error: {error}") if not code or not state: raise HTTPException(status_code=400, detail="Missing code/state in callback.") if not _consume_oauth_state(state): raise HTTPException(status_code=400, detail="Invalid or expired OAuth state.") try: _exchange_code_for_tokens(code) except SpotifyAuthError as exc: raise HTTPException(status_code=400, detail=str(exc)) from exc html = """ Spotify conectado

Spotify conectado correctamente

Ya puedes volver a tu cliente MCP y usar las tools.

""" return HTMLResponse(content=html, status_code=200) if __name__ == "__main__": import uvicorn port = int(os.getenv("PORT", "7860")) uvicorn.run("app:app", host="0.0.0.0", port=port)