Spaces:
Sleeping
Sleeping
| import base64 | |
| import json | |
| import os | |
| import secrets | |
| import time | |
| from pathlib import Path | |
| from typing import Any | |
| from urllib.parse import urlencode | |
| import httpx | |
| from dotenv import load_dotenv | |
| from fastapi import FastAPI, HTTPException, Query | |
| from fastapi.responses import HTMLResponse, RedirectResponse | |
| from mcp.server.fastmcp import FastMCP | |
| load_dotenv() | |
| SPOTIFY_ACCOUNTS_BASE = "https://accounts.spotify.com" | |
| SPOTIFY_API_BASE = "https://api.spotify.com/v1" | |
| SPOTIFY_CLIENT_ID = os.getenv("SPOTIFY_CLIENT_ID", "").strip() | |
| SPOTIFY_CLIENT_SECRET = os.getenv("SPOTIFY_CLIENT_SECRET", "").strip() | |
| SPOTIFY_REDIRECT_URI = os.getenv("SPOTIFY_REDIRECT_URI", "").strip() | |
| SPOTIFY_SCOPES = os.getenv( | |
| "SPOTIFY_SCOPES", | |
| "user-read-private user-read-email playlist-read-private " | |
| "playlist-read-collaborative playlist-modify-private playlist-modify-public", | |
| ).strip() | |
| SPOTIFY_TOKEN_FILE = Path(os.getenv("SPOTIFY_TOKEN_FILE", "spotify_tokens.json")).resolve() | |
| MCP_SSE_URL = os.getenv( | |
| "MCP_PUBLIC_SSE_URL", | |
| "https://pharmaia-demo-mcp-server-spotify.hf.space/gradio_api/mcp/sse", | |
| ).strip() | |
| # Keep OAuth states in memory for CSRF protection. | |
| OAUTH_STATE_TTL_SECONDS = 600 | |
| _oauth_states: dict[str, int] = {} | |
| class SpotifyAuthError(RuntimeError): | |
| pass | |
| def _require_spotify_config() -> None: | |
| missing = [] | |
| if not SPOTIFY_CLIENT_ID: | |
| missing.append("SPOTIFY_CLIENT_ID") | |
| if not SPOTIFY_CLIENT_SECRET: | |
| missing.append("SPOTIFY_CLIENT_SECRET") | |
| if not SPOTIFY_REDIRECT_URI: | |
| missing.append("SPOTIFY_REDIRECT_URI") | |
| if missing: | |
| raise SpotifyAuthError( | |
| "Missing required environment variables: " + ", ".join(missing) | |
| ) | |
| def _cleanup_expired_states() -> None: | |
| now = int(time.time()) | |
| expired = [k for k, ts in _oauth_states.items() if (now - ts) > OAUTH_STATE_TTL_SECONDS] | |
| for key in expired: | |
| _oauth_states.pop(key, None) | |
| def _new_oauth_state() -> str: | |
| _cleanup_expired_states() | |
| state = secrets.token_urlsafe(24) | |
| _oauth_states[state] = int(time.time()) | |
| return state | |
| def _consume_oauth_state(state: str) -> bool: | |
| _cleanup_expired_states() | |
| ts = _oauth_states.pop(state, None) | |
| if ts is None: | |
| return False | |
| return (int(time.time()) - ts) <= OAUTH_STATE_TTL_SECONDS | |
| def _spotify_basic_auth_header() -> str: | |
| token = f"{SPOTIFY_CLIENT_ID}:{SPOTIFY_CLIENT_SECRET}".encode("utf-8") | |
| return base64.b64encode(token).decode("utf-8") | |
| def _load_tokens() -> dict[str, Any] | None: | |
| if not SPOTIFY_TOKEN_FILE.exists(): | |
| return None | |
| try: | |
| return json.loads(SPOTIFY_TOKEN_FILE.read_text(encoding="utf-8")) | |
| except (json.JSONDecodeError, OSError): | |
| return None | |
| def _save_tokens(token_payload: dict[str, Any], previous: dict[str, Any] | None = None) -> dict[str, Any]: | |
| now = int(time.time()) | |
| expires_in = int(token_payload.get("expires_in", 3600)) | |
| merged: dict[str, Any] = {} | |
| if previous: | |
| merged.update(previous) | |
| merged.update(token_payload) | |
| if not merged.get("refresh_token") and previous: | |
| merged["refresh_token"] = previous.get("refresh_token") | |
| merged["saved_at"] = now | |
| merged["expires_at"] = now + expires_in | |
| SPOTIFY_TOKEN_FILE.write_text( | |
| json.dumps(merged, indent=2, ensure_ascii=True), | |
| encoding="utf-8", | |
| ) | |
| return merged | |
| def _is_expired(tokens: dict[str, Any]) -> bool: | |
| expires_at = int(tokens.get("expires_at", 0)) | |
| return int(time.time()) >= (expires_at - 30) | |
| def _exchange_code_for_tokens(code: str) -> dict[str, Any]: | |
| _require_spotify_config() | |
| data = { | |
| "grant_type": "authorization_code", | |
| "code": code, | |
| "redirect_uri": SPOTIFY_REDIRECT_URI, | |
| } | |
| headers = { | |
| "Authorization": f"Basic {_spotify_basic_auth_header()}", | |
| "Content-Type": "application/x-www-form-urlencoded", | |
| } | |
| with httpx.Client(timeout=20.0) as client: | |
| response = client.post( | |
| f"{SPOTIFY_ACCOUNTS_BASE}/api/token", | |
| data=data, | |
| headers=headers, | |
| ) | |
| if response.status_code != 200: | |
| try: | |
| payload = response.json() | |
| except ValueError: | |
| payload = {"error": response.text} | |
| raise SpotifyAuthError(f"Failed token exchange: {payload}") | |
| return _save_tokens(response.json()) | |
| def _refresh_access_token(tokens: dict[str, Any]) -> dict[str, Any]: | |
| refresh_token = tokens.get("refresh_token") | |
| if not refresh_token: | |
| raise SpotifyAuthError( | |
| "No refresh token found. Re-authenticate at /auth/login." | |
| ) | |
| data = { | |
| "grant_type": "refresh_token", | |
| "refresh_token": refresh_token, | |
| } | |
| headers = { | |
| "Authorization": f"Basic {_spotify_basic_auth_header()}", | |
| "Content-Type": "application/x-www-form-urlencoded", | |
| } | |
| with httpx.Client(timeout=20.0) as client: | |
| response = client.post( | |
| f"{SPOTIFY_ACCOUNTS_BASE}/api/token", | |
| data=data, | |
| headers=headers, | |
| ) | |
| if response.status_code != 200: | |
| try: | |
| payload = response.json() | |
| except ValueError: | |
| payload = {"error": response.text} | |
| raise SpotifyAuthError(f"Failed token refresh: {payload}") | |
| return _save_tokens(response.json(), previous=tokens) | |
| def _get_valid_access_token() -> str: | |
| _require_spotify_config() | |
| tokens = _load_tokens() | |
| if not tokens: | |
| raise SpotifyAuthError( | |
| "Spotify is not authenticated yet. Open /auth/login first." | |
| ) | |
| if _is_expired(tokens): | |
| tokens = _refresh_access_token(tokens) | |
| access_token = tokens.get("access_token") | |
| if not access_token: | |
| raise SpotifyAuthError( | |
| "Access token missing. Re-authenticate at /auth/login." | |
| ) | |
| return str(access_token) | |
| def _spotify_request( | |
| method: str, | |
| path: str, | |
| params: dict[str, Any] | None = None, | |
| json_body: dict[str, Any] | None = None, | |
| ) -> dict[str, Any]: | |
| token = _get_valid_access_token() | |
| headers = {"Authorization": f"Bearer {token}"} | |
| with httpx.Client(timeout=20.0) as client: | |
| response = client.request( | |
| method, | |
| f"{SPOTIFY_API_BASE}{path}", | |
| params=params, | |
| json=json_body, | |
| headers=headers, | |
| ) | |
| if response.status_code == 401: | |
| refreshed = _refresh_access_token(_load_tokens() or {}) | |
| headers["Authorization"] = f"Bearer {refreshed.get('access_token', '')}" | |
| response = client.request( | |
| method, | |
| f"{SPOTIFY_API_BASE}{path}", | |
| params=params, | |
| json=json_body, | |
| headers=headers, | |
| ) | |
| if response.status_code >= 400: | |
| try: | |
| payload = response.json() | |
| except ValueError: | |
| payload = {"error": response.text} | |
| raise SpotifyAuthError(f"Spotify API error {response.status_code}: {payload}") | |
| if not response.content: | |
| return {} | |
| try: | |
| return response.json() | |
| except ValueError: | |
| return {"raw": response.text} | |
| def _build_auth_url(state: str) -> str: | |
| _require_spotify_config() | |
| params = { | |
| "response_type": "code", | |
| "client_id": SPOTIFY_CLIENT_ID, | |
| "scope": SPOTIFY_SCOPES, | |
| "redirect_uri": SPOTIFY_REDIRECT_URI, | |
| "state": state, | |
| "show_dialog": "false", | |
| } | |
| return f"{SPOTIFY_ACCOUNTS_BASE}/authorize?{urlencode(params)}" | |
| def _token_status() -> dict[str, Any]: | |
| tokens = _load_tokens() or {} | |
| authenticated = bool(tokens.get("access_token")) | |
| return { | |
| "authenticated": authenticated, | |
| "token_file": str(SPOTIFY_TOKEN_FILE), | |
| "expires_at": tokens.get("expires_at"), | |
| "scopes": tokens.get("scope"), | |
| } | |
| def _short_playlist(playlist: dict[str, Any]) -> dict[str, Any]: | |
| return { | |
| "id": playlist.get("id"), | |
| "name": playlist.get("name"), | |
| "uri": playlist.get("uri"), | |
| "public": playlist.get("public"), | |
| "owner": (playlist.get("owner") or {}).get("id"), | |
| "total_items": (playlist.get("tracks") or {}).get("total"), | |
| "external_url": (playlist.get("external_urls") or {}).get("spotify"), | |
| } | |
| def _short_track(track: dict[str, Any]) -> dict[str, Any]: | |
| return { | |
| "id": track.get("id"), | |
| "name": track.get("name"), | |
| "uri": track.get("uri"), | |
| "artists": [a.get("name") for a in track.get("artists", [])], | |
| "album": (track.get("album") or {}).get("name"), | |
| "external_url": (track.get("external_urls") or {}).get("spotify"), | |
| } | |
| mcp = FastMCP( | |
| name="Spotify MCP Server", | |
| instructions=( | |
| "MCP server to query Spotify profile, search tracks, and manage playlists. " | |
| "If not authenticated, user must open /auth/login first." | |
| ), | |
| ) | |
| def spotify_auth_status() -> dict[str, Any]: | |
| """Return whether the server already has a valid Spotify token.""" | |
| return _token_status() | |
| def spotify_get_auth_url() -> dict[str, str]: | |
| """Generate the Spotify OAuth URL. Open it in a browser to connect the account.""" | |
| state = _new_oauth_state() | |
| return { | |
| "auth_url": _build_auth_url(state), | |
| "callback": SPOTIFY_REDIRECT_URI, | |
| "note": "Open auth_url in browser and finish login/consent.", | |
| } | |
| def spotify_get_my_profile() -> dict[str, Any]: | |
| """Get current Spotify user profile (/me).""" | |
| me = _spotify_request("GET", "/me") | |
| return { | |
| "id": me.get("id"), | |
| "display_name": me.get("display_name"), | |
| "uri": me.get("uri"), | |
| "country": me.get("country"), | |
| "product": me.get("product"), | |
| } | |
| def spotify_search_tracks(query: str, limit: int = 5, offset: int = 0) -> dict[str, Any]: | |
| """Search tracks using Spotify /search endpoint.""" | |
| safe_limit = max(1, min(limit, 10)) | |
| safe_offset = max(0, offset) | |
| payload = _spotify_request( | |
| "GET", | |
| "/search", | |
| params={ | |
| "q": query, | |
| "type": "track", | |
| "limit": safe_limit, | |
| "offset": safe_offset, | |
| }, | |
| ) | |
| tracks = payload.get("tracks") or {} | |
| items = [_short_track(t) for t in tracks.get("items", [])] | |
| return { | |
| "query": query, | |
| "limit": safe_limit, | |
| "offset": safe_offset, | |
| "total": tracks.get("total", 0), | |
| "items": items, | |
| } | |
| def spotify_list_my_playlists(limit: int = 10, offset: int = 0) -> dict[str, Any]: | |
| """List playlists from current user (/me/playlists).""" | |
| safe_limit = max(1, min(limit, 50)) | |
| safe_offset = max(0, offset) | |
| payload = _spotify_request( | |
| "GET", | |
| "/me/playlists", | |
| params={"limit": safe_limit, "offset": safe_offset}, | |
| ) | |
| return { | |
| "limit": safe_limit, | |
| "offset": safe_offset, | |
| "total": payload.get("total", 0), | |
| "items": [_short_playlist(p) for p in payload.get("items", [])], | |
| } | |
| def spotify_create_playlist( | |
| name: str, | |
| description: str = "", | |
| public: bool = False, | |
| ) -> dict[str, Any]: | |
| """Create a playlist in the current user's account.""" | |
| if not name.strip(): | |
| raise SpotifyAuthError("Playlist name cannot be empty.") | |
| me = _spotify_request("GET", "/me") | |
| user_id = str(me.get("id", "")).strip() | |
| if not user_id: | |
| raise SpotifyAuthError("Could not resolve current Spotify user id.") | |
| payload = _spotify_request( | |
| "POST", | |
| f"/users/{user_id}/playlists", | |
| json_body={ | |
| "name": name.strip(), | |
| "description": description, | |
| "public": public, | |
| }, | |
| ) | |
| return { | |
| "id": payload.get("id"), | |
| "name": payload.get("name"), | |
| "uri": payload.get("uri"), | |
| "public": payload.get("public"), | |
| "external_url": (payload.get("external_urls") or {}).get("spotify"), | |
| } | |
| def spotify_add_items_to_playlist(playlist_id: str, uris: list[str]) -> dict[str, Any]: | |
| """Add Spotify item URIs to a playlist (/playlists/{id}/items).""" | |
| clean_uris = [u.strip() for u in uris if u and u.strip()] | |
| if not playlist_id.strip(): | |
| raise SpotifyAuthError("playlist_id is required.") | |
| if not clean_uris: | |
| raise SpotifyAuthError("Provide at least one URI.") | |
| if len(clean_uris) > 100: | |
| raise SpotifyAuthError("Spotify allows up to 100 URIs per request.") | |
| payload = _spotify_request( | |
| "POST", | |
| f"/playlists/{playlist_id.strip()}/items", | |
| json_body={"uris": clean_uris}, | |
| ) | |
| return { | |
| "playlist_id": playlist_id.strip(), | |
| "added": len(clean_uris), | |
| "snapshot_id": payload.get("snapshot_id"), | |
| } | |
| app = FastAPI(title="Spotify MCP Server", version="1.0.0") | |
| app.mount("/gradio_api/mcp", mcp.sse_app("/gradio_api/mcp")) | |
| def root() -> dict[str, Any]: | |
| return { | |
| "service": "spotify-mcp-server", | |
| "auth_login_url": "/auth/login", | |
| "auth_status_url": "/auth/status", | |
| "mcp_sse_path": "/gradio_api/mcp/sse", | |
| "public_mcp_sse_url": MCP_SSE_URL, | |
| } | |
| def health() -> dict[str, str]: | |
| return {"status": "ok"} | |
| def auth_status() -> dict[str, Any]: | |
| return _token_status() | |
| def auth_login() -> RedirectResponse: | |
| try: | |
| state = _new_oauth_state() | |
| url = _build_auth_url(state) | |
| except SpotifyAuthError as exc: | |
| raise HTTPException(status_code=500, detail=str(exc)) from exc | |
| return RedirectResponse(url=url, status_code=307) | |
| def auth_callback( | |
| code: str | None = Query(default=None), | |
| state: str | None = Query(default=None), | |
| error: str | None = Query(default=None), | |
| ) -> HTMLResponse: | |
| if error: | |
| raise HTTPException(status_code=400, detail=f"Spotify authorization error: {error}") | |
| if not code or not state: | |
| raise HTTPException(status_code=400, detail="Missing code/state in callback.") | |
| if not _consume_oauth_state(state): | |
| raise HTTPException(status_code=400, detail="Invalid or expired OAuth state.") | |
| try: | |
| _exchange_code_for_tokens(code) | |
| except SpotifyAuthError as exc: | |
| raise HTTPException(status_code=400, detail=str(exc)) from exc | |
| html = """ | |
| <html> | |
| <head><title>Spotify conectado</title></head> | |
| <body style='font-family: Arial, sans-serif; padding: 24px;'> | |
| <h2>Spotify conectado correctamente</h2> | |
| <p>Ya puedes volver a tu cliente MCP y usar las tools.</p> | |
| </body> | |
| </html> | |
| """ | |
| return HTMLResponse(content=html, status_code=200) | |
| if __name__ == "__main__": | |
| import uvicorn | |
| port = int(os.getenv("PORT", "7860")) | |
| uvicorn.run("app:app", host="0.0.0.0", port=port) | |