Spaces:
Sleeping
Sleeping
| import base64 | |
| import json | |
| import os | |
| import secrets | |
| import time | |
| from pathlib import Path | |
| from typing import Any | |
| from urllib.parse import urlencode | |
| import httpx | |
| from dotenv import load_dotenv | |
| from fastapi import FastAPI, HTTPException, Query | |
| from fastapi.responses import HTMLResponse, RedirectResponse | |
| from mcp.server.fastmcp import FastMCP | |
| load_dotenv() | |
| SPOTIFY_ACCOUNTS_BASE = "https://accounts.spotify.com" | |
| SPOTIFY_API_BASE = "https://api.spotify.com/v1" | |
| SPOTIFY_CLIENT_ID = os.getenv("SPOTIFY_CLIENT_ID", "").strip() | |
| SPOTIFY_CLIENT_SECRET = os.getenv("SPOTIFY_CLIENT_SECRET", "").strip() | |
| SPOTIFY_REDIRECT_URI = os.getenv("SPOTIFY_REDIRECT_URI", "").strip() | |
| _SPOTIFY_SCOPES_RAW = os.getenv( | |
| "SPOTIFY_SCOPES", | |
| "user-read-private user-read-email playlist-read-private " | |
| "playlist-read-collaborative playlist-modify-private playlist-modify-public " | |
| "user-library-read user-read-recently-played", | |
| ).strip() | |
| _REQUIRED_SCOPES = [ | |
| "user-read-private", | |
| "user-read-email", | |
| "playlist-read-private", | |
| "playlist-read-collaborative", | |
| "playlist-modify-private", | |
| "playlist-modify-public", | |
| "user-library-read", | |
| "user-library-modify", | |
| "user-read-recently-played", | |
| "user-top-read", | |
| ] | |
| _scope_parts = _SPOTIFY_SCOPES_RAW.split() | |
| for _required_scope in _REQUIRED_SCOPES: | |
| if _required_scope not in _scope_parts: | |
| _scope_parts.append(_required_scope) | |
| SPOTIFY_SCOPES = " ".join(_scope_parts) | |
| SPOTIFY_TOKEN_FILE = Path(os.getenv("SPOTIFY_TOKEN_FILE", "spotify_tokens.json")).resolve() | |
| MCP_SSE_URL = os.getenv( | |
| "MCP_PUBLIC_SSE_URL", | |
| "https://pharmaia-demo-mcp-server-spotify.hf.space/gradio_api/mcp/sse", | |
| ).strip() | |
| # Keep OAuth states in memory for CSRF protection. | |
| OAUTH_STATE_TTL_SECONDS = 600 | |
| _oauth_states: dict[str, int] = {} | |
| class SpotifyAuthError(RuntimeError): | |
| pass | |
| def _require_spotify_config() -> None: | |
| missing = [] | |
| if not SPOTIFY_CLIENT_ID: | |
| missing.append("SPOTIFY_CLIENT_ID") | |
| if not SPOTIFY_CLIENT_SECRET: | |
| missing.append("SPOTIFY_CLIENT_SECRET") | |
| if not SPOTIFY_REDIRECT_URI: | |
| missing.append("SPOTIFY_REDIRECT_URI") | |
| if missing: | |
| raise SpotifyAuthError( | |
| "Missing required environment variables: " + ", ".join(missing) | |
| ) | |
| def _cleanup_expired_states() -> None: | |
| now = int(time.time()) | |
| expired = [k for k, ts in _oauth_states.items() if (now - ts) > OAUTH_STATE_TTL_SECONDS] | |
| for key in expired: | |
| _oauth_states.pop(key, None) | |
| def _new_oauth_state() -> str: | |
| _cleanup_expired_states() | |
| state = secrets.token_urlsafe(24) | |
| _oauth_states[state] = int(time.time()) | |
| return state | |
| def _consume_oauth_state(state: str) -> bool: | |
| _cleanup_expired_states() | |
| ts = _oauth_states.pop(state, None) | |
| if ts is None: | |
| return False | |
| return (int(time.time()) - ts) <= OAUTH_STATE_TTL_SECONDS | |
| def _spotify_basic_auth_header() -> str: | |
| token = f"{SPOTIFY_CLIENT_ID}:{SPOTIFY_CLIENT_SECRET}".encode("utf-8") | |
| return base64.b64encode(token).decode("utf-8") | |
| def _load_tokens() -> dict[str, Any] | None: | |
| if not SPOTIFY_TOKEN_FILE.exists(): | |
| return None | |
| try: | |
| return json.loads(SPOTIFY_TOKEN_FILE.read_text(encoding="utf-8")) | |
| except (json.JSONDecodeError, OSError): | |
| return None | |
| def _save_tokens(token_payload: dict[str, Any], previous: dict[str, Any] | None = None) -> dict[str, Any]: | |
| now = int(time.time()) | |
| expires_in = int(token_payload.get("expires_in", 3600)) | |
| merged: dict[str, Any] = {} | |
| if previous: | |
| merged.update(previous) | |
| merged.update(token_payload) | |
| if not merged.get("refresh_token") and previous: | |
| merged["refresh_token"] = previous.get("refresh_token") | |
| merged["saved_at"] = now | |
| merged["expires_at"] = now + expires_in | |
| SPOTIFY_TOKEN_FILE.write_text( | |
| json.dumps(merged, indent=2, ensure_ascii=True), | |
| encoding="utf-8", | |
| ) | |
| return merged | |
| def _is_expired(tokens: dict[str, Any]) -> bool: | |
| expires_at = int(tokens.get("expires_at", 0)) | |
| return int(time.time()) >= (expires_at - 30) | |
| def _exchange_code_for_tokens(code: str) -> dict[str, Any]: | |
| _require_spotify_config() | |
| data = { | |
| "grant_type": "authorization_code", | |
| "code": code, | |
| "redirect_uri": SPOTIFY_REDIRECT_URI, | |
| } | |
| headers = { | |
| "Authorization": f"Basic {_spotify_basic_auth_header()}", | |
| "Content-Type": "application/x-www-form-urlencoded", | |
| } | |
| with httpx.Client(timeout=20.0) as client: | |
| response = client.post( | |
| f"{SPOTIFY_ACCOUNTS_BASE}/api/token", | |
| data=data, | |
| headers=headers, | |
| ) | |
| if response.status_code != 200: | |
| try: | |
| payload = response.json() | |
| except ValueError: | |
| payload = {"error": response.text} | |
| raise SpotifyAuthError(f"Failed token exchange: {payload}") | |
| return _save_tokens(response.json()) | |
| def _refresh_access_token(tokens: dict[str, Any]) -> dict[str, Any]: | |
| refresh_token = tokens.get("refresh_token") | |
| if not refresh_token: | |
| raise SpotifyAuthError( | |
| "No refresh token found. Re-authenticate at /auth/login." | |
| ) | |
| data = { | |
| "grant_type": "refresh_token", | |
| "refresh_token": refresh_token, | |
| } | |
| headers = { | |
| "Authorization": f"Basic {_spotify_basic_auth_header()}", | |
| "Content-Type": "application/x-www-form-urlencoded", | |
| } | |
| with httpx.Client(timeout=20.0) as client: | |
| response = client.post( | |
| f"{SPOTIFY_ACCOUNTS_BASE}/api/token", | |
| data=data, | |
| headers=headers, | |
| ) | |
| if response.status_code != 200: | |
| try: | |
| payload = response.json() | |
| except ValueError: | |
| payload = {"error": response.text} | |
| raise SpotifyAuthError(f"Failed token refresh: {payload}") | |
| return _save_tokens(response.json(), previous=tokens) | |
| def _get_valid_access_token() -> str: | |
| _require_spotify_config() | |
| tokens = _load_tokens() | |
| if not tokens: | |
| raise SpotifyAuthError( | |
| "Spotify is not authenticated yet. Open /auth/login first." | |
| ) | |
| if _is_expired(tokens): | |
| tokens = _refresh_access_token(tokens) | |
| access_token = tokens.get("access_token") | |
| if not access_token: | |
| raise SpotifyAuthError( | |
| "Access token missing. Re-authenticate at /auth/login." | |
| ) | |
| return str(access_token) | |
| def _spotify_request( | |
| method: str, | |
| path: str, | |
| params: dict[str, Any] | None = None, | |
| json_body: dict[str, Any] | None = None, | |
| ) -> dict[str, Any]: | |
| token = _get_valid_access_token() | |
| headers = {"Authorization": f"Bearer {token}"} | |
| with httpx.Client(timeout=20.0) as client: | |
| response = client.request( | |
| method, | |
| f"{SPOTIFY_API_BASE}{path}", | |
| params=params, | |
| json=json_body, | |
| headers=headers, | |
| ) | |
| if response.status_code == 401: | |
| refreshed = _refresh_access_token(_load_tokens() or {}) | |
| headers["Authorization"] = f"Bearer {refreshed.get('access_token', '')}" | |
| response = client.request( | |
| method, | |
| f"{SPOTIFY_API_BASE}{path}", | |
| params=params, | |
| json=json_body, | |
| headers=headers, | |
| ) | |
| if response.status_code >= 400: | |
| try: | |
| payload = response.json() | |
| except ValueError: | |
| payload = {"error": response.text} | |
| if response.status_code == 403: | |
| error_message = "" | |
| if isinstance(payload, dict): | |
| err = payload.get("error") | |
| if isinstance(err, dict): | |
| error_message = str(err.get("message", "")).strip() | |
| elif isinstance(err, str): | |
| error_message = err.strip() | |
| if "Insufficient client scope" in error_message: | |
| raise SpotifyAuthError( | |
| "Spotify token without required scopes. Re-authenticate at " | |
| "/auth/reset then /auth/login (force consent), and confirm " | |
| "SPOTIFY_SCOPES includes playlist-modify-private and " | |
| "playlist-modify-public." | |
| ) | |
| raise SpotifyAuthError(f"Spotify API error {response.status_code}: {payload}") | |
| if not response.content: | |
| return {} | |
| try: | |
| return response.json() | |
| except ValueError: | |
| return {"raw": response.text} | |
| def _build_auth_url(state: str, show_dialog: bool = True) -> str: | |
| _require_spotify_config() | |
| params = { | |
| "response_type": "code", | |
| "client_id": SPOTIFY_CLIENT_ID, | |
| "scope": SPOTIFY_SCOPES, | |
| "redirect_uri": SPOTIFY_REDIRECT_URI, | |
| "state": state, | |
| "show_dialog": "true" if show_dialog else "false", | |
| } | |
| return f"{SPOTIFY_ACCOUNTS_BASE}/authorize?{urlencode(params)}" | |
| def _token_status() -> dict[str, Any]: | |
| tokens = _load_tokens() or {} | |
| authenticated = bool(tokens.get("access_token")) | |
| return { | |
| "authenticated": authenticated, | |
| "token_file": str(SPOTIFY_TOKEN_FILE), | |
| "expires_at": tokens.get("expires_at"), | |
| "scopes": tokens.get("scope"), | |
| } | |
| def _token_scope_set() -> set[str]: | |
| tokens = _load_tokens() or {} | |
| scope_str = str(tokens.get("scope", "")).strip() | |
| return {scope for scope in scope_str.split() if scope} | |
| def _require_any_scope(required_scopes: list[str], action: str) -> None: | |
| token_scopes = _token_scope_set() | |
| if not token_scopes: | |
| return | |
| if any(scope in token_scopes for scope in required_scopes): | |
| return | |
| raise SpotifyAuthError( | |
| f"Missing scope for {action}. Need one of: {', '.join(required_scopes)}. " | |
| "Run /auth/reset and /auth/login to re-authorize with updated scopes." | |
| ) | |
| def _short_playlist(playlist: dict[str, Any]) -> dict[str, Any]: | |
| return { | |
| "id": playlist.get("id"), | |
| "name": playlist.get("name"), | |
| "uri": playlist.get("uri"), | |
| "public": playlist.get("public"), | |
| "owner": (playlist.get("owner") or {}).get("id"), | |
| "total_items": (playlist.get("tracks") or {}).get("total"), | |
| "external_url": (playlist.get("external_urls") or {}).get("spotify"), | |
| } | |
| def _short_track(track: dict[str, Any]) -> dict[str, Any]: | |
| return { | |
| "id": track.get("id"), | |
| "name": track.get("name"), | |
| "uri": track.get("uri"), | |
| "artists": [a.get("name") for a in track.get("artists", [])], | |
| "album": (track.get("album") or {}).get("name"), | |
| "external_url": (track.get("external_urls") or {}).get("spotify"), | |
| } | |
| def _normalize_track_id(track_ref: str) -> str: | |
| value = track_ref.strip() | |
| if not value: | |
| return "" | |
| if value.startswith("spotify:track:"): | |
| return value.split(":")[-1].strip() | |
| if "open.spotify.com/track/" in value: | |
| return value.split("open.spotify.com/track/")[-1].split("?")[0].strip().strip("/") | |
| return value | |
| def _normalize_track_ids(track_ids: list[str], max_items: int = 500) -> list[str]: | |
| normalized: list[str] = [] | |
| seen: set[str] = set() | |
| for raw in track_ids: | |
| item = _normalize_track_id(raw) | |
| if item and item not in seen: | |
| normalized.append(item) | |
| seen.add(item) | |
| if not normalized: | |
| raise SpotifyAuthError("Provide at least one valid track id or track uri.") | |
| if len(normalized) > max_items: | |
| raise SpotifyAuthError(f"Too many track ids. Maximum allowed is {max_items}.") | |
| return normalized | |
| def _normalize_playlist_id(playlist_ref: str) -> str: | |
| value = playlist_ref.strip() | |
| if not value: | |
| return "" | |
| if value.startswith("spotify:playlist:"): | |
| return value.split(":")[-1].strip() | |
| if "open.spotify.com/playlist/" in value: | |
| return value.split("open.spotify.com/playlist/")[-1].split("?")[0].strip().strip("/") | |
| return value | |
| def _chunked(items: list[str], size: int) -> list[list[str]]: | |
| return [items[i : i + size] for i in range(0, len(items), size)] | |
| mcp = FastMCP( | |
| name="Spotify MCP Server", | |
| instructions=( | |
| "MCP server to query Spotify profile, search tracks, top tracks, library, " | |
| "listening history, and manage playlists. " | |
| "If not authenticated, user must open /auth/login first." | |
| ), | |
| ) | |
| def spotify_auth_status() -> dict[str, Any]: | |
| """Return whether the server already has a valid Spotify token.""" | |
| return _token_status() | |
| def spotify_get_auth_url() -> dict[str, str]: | |
| """Generate the Spotify OAuth URL. Open it in a browser to connect the account.""" | |
| state = _new_oauth_state() | |
| return { | |
| "auth_url": _build_auth_url(state), | |
| "callback": SPOTIFY_REDIRECT_URI, | |
| "note": "Open auth_url in browser and finish login/consent.", | |
| } | |
| def spotify_get_my_profile() -> dict[str, Any]: | |
| """Get current Spotify user profile (/me).""" | |
| me = _spotify_request("GET", "/me") | |
| return { | |
| "id": me.get("id"), | |
| "display_name": me.get("display_name"), | |
| "uri": me.get("uri"), | |
| "country": me.get("country"), | |
| "product": me.get("product"), | |
| } | |
| def spotify_search_tracks(query: str, limit: int = 5, offset: int = 0) -> dict[str, Any]: | |
| """Search tracks using Spotify /search endpoint.""" | |
| safe_limit = max(1, min(limit, 10)) | |
| safe_offset = max(0, offset) | |
| payload = _spotify_request( | |
| "GET", | |
| "/search", | |
| params={ | |
| "q": query, | |
| "type": "track", | |
| "limit": safe_limit, | |
| "offset": safe_offset, | |
| }, | |
| ) | |
| tracks = payload.get("tracks") or {} | |
| items = [_short_track(t) for t in tracks.get("items", [])] | |
| return { | |
| "query": query, | |
| "limit": safe_limit, | |
| "offset": safe_offset, | |
| "total": tracks.get("total", 0), | |
| "items": items, | |
| } | |
| def spotify_list_saved_tracks(limit: int = 20, offset: int = 0) -> dict[str, Any]: | |
| """List user's saved (liked) tracks from library (/me/tracks).""" | |
| safe_limit = max(1, min(limit, 50)) | |
| safe_offset = max(0, offset) | |
| payload = _spotify_request( | |
| "GET", | |
| "/me/tracks", | |
| params={"limit": safe_limit, "offset": safe_offset}, | |
| ) | |
| items: list[dict[str, Any]] = [] | |
| for item in payload.get("items", []): | |
| track = _short_track(item.get("track") or {}) | |
| track["added_at"] = item.get("added_at") | |
| items.append(track) | |
| return { | |
| "limit": safe_limit, | |
| "offset": safe_offset, | |
| "total": payload.get("total", 0), | |
| "items": items, | |
| } | |
| def spotify_list_recently_played(limit: int = 20) -> dict[str, Any]: | |
| """List recently played tracks (/me/player/recently-played).""" | |
| safe_limit = max(1, min(limit, 50)) | |
| payload = _spotify_request( | |
| "GET", | |
| "/me/player/recently-played", | |
| params={"limit": safe_limit}, | |
| ) | |
| items: list[dict[str, Any]] = [] | |
| for item in payload.get("items", []): | |
| track = _short_track(item.get("track") or {}) | |
| track["played_at"] = item.get("played_at") | |
| context = item.get("context") or {} | |
| track["context_type"] = context.get("type") | |
| track["context_uri"] = context.get("uri") | |
| items.append(track) | |
| return { | |
| "limit": safe_limit, | |
| "items": items, | |
| "cursors": payload.get("cursors"), | |
| } | |
| def spotify_get_top_tracks( | |
| time_range: str = "medium_term", | |
| limit: int = 20, | |
| offset: int = 0, | |
| ) -> dict[str, Any]: | |
| """Get user top tracks (/me/top/tracks).""" | |
| allowed_ranges = {"short_term", "medium_term", "long_term"} | |
| safe_range = time_range if time_range in allowed_ranges else "medium_term" | |
| safe_limit = max(1, min(limit, 50)) | |
| safe_offset = max(0, offset) | |
| payload = _spotify_request( | |
| "GET", | |
| "/me/top/tracks", | |
| params={ | |
| "time_range": safe_range, | |
| "limit": safe_limit, | |
| "offset": safe_offset, | |
| }, | |
| ) | |
| return { | |
| "time_range": safe_range, | |
| "limit": safe_limit, | |
| "offset": safe_offset, | |
| "total": payload.get("total", 0), | |
| "items": [_short_track(t) for t in payload.get("items", [])], | |
| } | |
| def spotify_check_saved_tracks(track_ids: list[str]) -> dict[str, Any]: | |
| """Check if tracks are saved in user's library (/me/tracks/contains).""" | |
| ids = _normalize_track_ids(track_ids) | |
| chunks = _chunked(ids, 50) | |
| states: list[bool] = [] | |
| for chunk in chunks: | |
| payload = _spotify_request( | |
| "GET", | |
| "/me/tracks/contains", | |
| params={"ids": ",".join(chunk)}, | |
| ) | |
| if isinstance(payload, list): | |
| states.extend(bool(x) for x in payload) | |
| else: | |
| raise SpotifyAuthError("Unexpected response for /me/tracks/contains.") | |
| items = [{"id": track_id, "saved": states[idx]} for idx, track_id in enumerate(ids)] | |
| return {"total": len(ids), "items": items} | |
| def spotify_save_tracks(track_ids: list[str]) -> dict[str, Any]: | |
| """Save tracks to user library (PUT /me/tracks).""" | |
| ids = _normalize_track_ids(track_ids) | |
| chunks = _chunked(ids, 50) | |
| for chunk in chunks: | |
| _spotify_request( | |
| "PUT", | |
| "/me/tracks", | |
| params={"ids": ",".join(chunk)}, | |
| ) | |
| return {"saved": len(ids), "track_ids": ids} | |
| def spotify_remove_saved_tracks(track_ids: list[str]) -> dict[str, Any]: | |
| """Remove tracks from user library (DELETE /me/tracks).""" | |
| ids = _normalize_track_ids(track_ids) | |
| chunks = _chunked(ids, 50) | |
| for chunk in chunks: | |
| _spotify_request( | |
| "DELETE", | |
| "/me/tracks", | |
| params={"ids": ",".join(chunk)}, | |
| ) | |
| return {"removed": len(ids), "track_ids": ids} | |
| def spotify_set_tracks_saved(track_ids: list[str], saved: bool) -> dict[str, Any]: | |
| """Set saved state for tracks in user library (alta/baja).""" | |
| if saved: | |
| result = spotify_save_tracks(track_ids) | |
| result["action"] = "saved" | |
| return result | |
| result = spotify_remove_saved_tracks(track_ids) | |
| result["action"] = "removed" | |
| return result | |
| def spotify_list_my_playlists(limit: int = 10, offset: int = 0) -> dict[str, Any]: | |
| """List playlists from current user (/me/playlists).""" | |
| safe_limit = max(1, min(limit, 50)) | |
| safe_offset = max(0, offset) | |
| payload = _spotify_request( | |
| "GET", | |
| "/me/playlists", | |
| params={"limit": safe_limit, "offset": safe_offset}, | |
| ) | |
| return { | |
| "limit": safe_limit, | |
| "offset": safe_offset, | |
| "total": payload.get("total", 0), | |
| "items": [_short_playlist(p) for p in payload.get("items", [])], | |
| } | |
| def spotify_create_playlist( | |
| name: str, | |
| description: str = "", | |
| public: bool = False, | |
| ) -> dict[str, Any]: | |
| """Create a playlist in the current user's account.""" | |
| if not name.strip(): | |
| raise SpotifyAuthError("Playlist name cannot be empty.") | |
| required_scope = "playlist-modify-public" if public else "playlist-modify-private" | |
| _require_any_scope([required_scope], "create playlist") | |
| payload = _spotify_request( | |
| "POST", | |
| "/me/playlists", | |
| json_body={ | |
| "name": name.strip(), | |
| "description": description, | |
| "public": public, | |
| }, | |
| ) | |
| return { | |
| "id": payload.get("id"), | |
| "name": payload.get("name"), | |
| "uri": payload.get("uri"), | |
| "public": payload.get("public"), | |
| "external_url": (payload.get("external_urls") or {}).get("spotify"), | |
| } | |
| def spotify_add_items_to_playlist(playlist_id: str, uris: list[str]) -> dict[str, Any]: | |
| """Add Spotify item URIs to a playlist (/playlists/{id}/items).""" | |
| _require_any_scope( | |
| ["playlist-modify-private", "playlist-modify-public"], | |
| "add items to playlist", | |
| ) | |
| clean_uris = [u.strip() for u in uris if u and u.strip()] | |
| if not playlist_id.strip(): | |
| raise SpotifyAuthError("playlist_id is required.") | |
| if not clean_uris: | |
| raise SpotifyAuthError("Provide at least one URI.") | |
| if len(clean_uris) > 100: | |
| raise SpotifyAuthError("Spotify allows up to 100 URIs per request.") | |
| payload = _spotify_request( | |
| "POST", | |
| f"/playlists/{_normalize_playlist_id(playlist_id)}/items", | |
| json_body={"uris": clean_uris}, | |
| ) | |
| return { | |
| "playlist_id": _normalize_playlist_id(playlist_id), | |
| "added": len(clean_uris), | |
| "snapshot_id": payload.get("snapshot_id"), | |
| } | |
| def spotify_update_playlist_details( | |
| playlist_id: str, | |
| name: str | None = None, | |
| description: str | None = None, | |
| public: bool | None = None, | |
| collaborative: bool | None = None, | |
| ) -> dict[str, Any]: | |
| """Update playlist metadata (PUT /playlists/{id}).""" | |
| _require_any_scope( | |
| ["playlist-modify-private", "playlist-modify-public"], | |
| "update playlist details", | |
| ) | |
| pid = _normalize_playlist_id(playlist_id) | |
| if not pid: | |
| raise SpotifyAuthError("playlist_id is required.") | |
| body: dict[str, Any] = {} | |
| if name is not None: | |
| body["name"] = name.strip() | |
| if description is not None: | |
| body["description"] = description | |
| if public is not None: | |
| body["public"] = public | |
| if collaborative is not None: | |
| body["collaborative"] = collaborative | |
| if not body: | |
| raise SpotifyAuthError("Provide at least one field to update.") | |
| # Spotify collaborative playlists must be private. | |
| if body.get("collaborative") is True and body.get("public") is True: | |
| raise SpotifyAuthError("Collaborative playlists cannot be public.") | |
| if body.get("collaborative") is True and "public" not in body: | |
| body["public"] = False | |
| _spotify_request( | |
| "PUT", | |
| f"/playlists/{pid}", | |
| json_body=body, | |
| ) | |
| return { | |
| "playlist_id": pid, | |
| "updated_fields": list(body.keys()), | |
| } | |
| def spotify_replace_playlist_items(playlist_id: str, uris: list[str]) -> dict[str, Any]: | |
| """Replace all items in playlist (PUT /playlists/{id}/tracks).""" | |
| _require_any_scope( | |
| ["playlist-modify-private", "playlist-modify-public"], | |
| "replace playlist items", | |
| ) | |
| pid = _normalize_playlist_id(playlist_id) | |
| if not pid: | |
| raise SpotifyAuthError("playlist_id is required.") | |
| clean_uris = [u.strip() for u in uris if u and u.strip()] | |
| if not clean_uris: | |
| raise SpotifyAuthError("Provide at least one URI.") | |
| if len(clean_uris) > 100: | |
| raise SpotifyAuthError("Spotify allows up to 100 URIs per request.") | |
| payload = _spotify_request( | |
| "PUT", | |
| f"/playlists/{pid}/tracks", | |
| json_body={"uris": clean_uris}, | |
| ) | |
| return { | |
| "playlist_id": pid, | |
| "replaced_with": len(clean_uris), | |
| "snapshot_id": payload.get("snapshot_id"), | |
| } | |
| def spotify_delete_playlist(playlist_id: str) -> dict[str, Any]: | |
| """Delete playlist from library (DELETE /playlists/{id}/followers).""" | |
| _require_any_scope( | |
| ["playlist-modify-private", "playlist-modify-public"], | |
| "delete playlist", | |
| ) | |
| pid = _normalize_playlist_id(playlist_id) | |
| if not pid: | |
| raise SpotifyAuthError("playlist_id is required.") | |
| _spotify_request( | |
| "DELETE", | |
| f"/playlists/{pid}/followers", | |
| ) | |
| return { | |
| "playlist_id": pid, | |
| "deleted": True, | |
| "note": "Spotify performs unfollow; own playlists are removed from your profile.", | |
| } | |
| app = FastAPI(title="Spotify MCP Server", version="1.0.0") | |
| app.mount("/gradio_api/mcp", mcp.sse_app("/gradio_api/mcp")) | |
| def root() -> dict[str, Any]: | |
| return { | |
| "service": "spotify-mcp-server", | |
| "auth_login_url": "/auth/login", | |
| "auth_reset_url": "/auth/reset", | |
| "auth_status_url": "/auth/status", | |
| "mcp_sse_path": "/gradio_api/mcp/sse", | |
| "public_mcp_sse_url": MCP_SSE_URL, | |
| } | |
| def health() -> dict[str, str]: | |
| return {"status": "ok"} | |
| def auth_status() -> dict[str, Any]: | |
| return _token_status() | |
| def auth_reset() -> dict[str, Any]: | |
| removed = False | |
| if SPOTIFY_TOKEN_FILE.exists(): | |
| try: | |
| SPOTIFY_TOKEN_FILE.unlink(missing_ok=True) | |
| removed = True | |
| except OSError as exc: | |
| raise HTTPException( | |
| status_code=500, | |
| detail=f"Could not remove token file: {exc}", | |
| ) from exc | |
| return {"reset": True, "removed_token_file": removed} | |
| def auth_login(force: bool = Query(default=True)) -> RedirectResponse: | |
| try: | |
| state = _new_oauth_state() | |
| url = _build_auth_url(state, show_dialog=force) | |
| except SpotifyAuthError as exc: | |
| raise HTTPException(status_code=500, detail=str(exc)) from exc | |
| return RedirectResponse(url=url, status_code=307) | |
| def auth_callback( | |
| code: str | None = Query(default=None), | |
| state: str | None = Query(default=None), | |
| error: str | None = Query(default=None), | |
| ) -> HTMLResponse: | |
| if error: | |
| raise HTTPException(status_code=400, detail=f"Spotify authorization error: {error}") | |
| if not code or not state: | |
| raise HTTPException(status_code=400, detail="Missing code/state in callback.") | |
| if not _consume_oauth_state(state): | |
| raise HTTPException(status_code=400, detail="Invalid or expired OAuth state.") | |
| try: | |
| _exchange_code_for_tokens(code) | |
| except SpotifyAuthError as exc: | |
| raise HTTPException(status_code=400, detail=str(exc)) from exc | |
| html = """ | |
| <html> | |
| <head><title>Spotify conectado</title></head> | |
| <body style='font-family: Arial, sans-serif; padding: 24px;'> | |
| <h2>Spotify conectado correctamente</h2> | |
| <p>Ya puedes volver a tu cliente MCP y usar las tools.</p> | |
| </body> | |
| </html> | |
| """ | |
| return HTMLResponse(content=html, status_code=200) | |
| if __name__ == "__main__": | |
| import uvicorn | |
| port = int(os.getenv("PORT", "7860")) | |
| uvicorn.run("app:app", host="0.0.0.0", port=port) | |