pharmaia commited on
Commit
d2742e5
·
verified ·
1 Parent(s): 8a85068

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +448 -314
app.py CHANGED
@@ -3,377 +3,511 @@ import json
3
  import os
4
  import secrets
5
  import time
6
- import urllib.parse
7
- from dataclasses import dataclass
8
  from pathlib import Path
9
  from typing import Any
 
10
 
11
- import gradio as gr
12
- import requests
13
  from dotenv import load_dotenv
14
-
 
 
15
 
16
  load_dotenv()
17
 
18
  SPOTIFY_ACCOUNTS_BASE = "https://accounts.spotify.com"
19
  SPOTIFY_API_BASE = "https://api.spotify.com/v1"
20
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
21
 
22
- @dataclass
23
- class AuthConfig:
24
- client_id: str
25
- client_secret: str
26
- redirect_uri: str
27
- scopes: str
28
- token_file: Path
29
- state_file: Path
30
- env_refresh_token: str | None
31
 
 
 
 
 
 
32
 
33
- class SpotifyClient:
34
- def __init__(self, config: AuthConfig) -> None:
35
- self.config = config
36
 
37
- def _read_json(self, path: Path) -> dict[str, Any] | None:
38
- if not path.exists():
39
- return None
40
- try:
41
- return json.loads(path.read_text(encoding="utf-8"))
42
- except (json.JSONDecodeError, OSError):
43
- return None
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
44
 
45
- def _write_json(self, path: Path, payload: dict[str, Any]) -> None:
46
- path.write_text(json.dumps(payload, indent=2), encoding="utf-8")
47
 
48
- def _read_token_data(self) -> dict[str, Any] | None:
49
- return self._read_json(self.config.token_file)
 
50
 
51
- def _write_token_data(self, token_data: dict[str, Any]) -> None:
52
- token_data["saved_at"] = int(time.time())
53
- self._write_json(self.config.token_file, token_data)
54
 
55
- def _basic_auth_header(self) -> str:
56
- raw = f"{self.config.client_id}:{self.config.client_secret}".encode("utf-8")
57
- return base64.b64encode(raw).decode("ascii")
58
 
59
- def _token_is_expired(self, token_data: dict[str, Any]) -> bool:
60
- expires_in = int(token_data.get("expires_in", 0))
61
- saved_at = int(token_data.get("saved_at", 0))
62
- return time.time() >= saved_at + expires_in - 60
 
 
 
 
 
63
 
64
- def _request_token(self, data: dict[str, Any]) -> dict[str, Any]:
65
- response = requests.post(
66
  f"{SPOTIFY_ACCOUNTS_BASE}/api/token",
67
- headers={
68
- "Authorization": f"Basic {self._basic_auth_header()}",
69
- "Content-Type": "application/x-www-form-urlencoded",
70
- },
71
  data=data,
72
- timeout=30,
73
  )
74
- response.raise_for_status()
75
- return response.json()
76
 
77
- def _refresh_token(self, refresh_token: str) -> dict[str, Any]:
78
- new_token = self._request_token(
79
- {
80
- "grant_type": "refresh_token",
81
- "refresh_token": refresh_token,
82
- }
83
- )
84
- new_token["refresh_token"] = new_token.get("refresh_token", refresh_token)
85
- self._write_token_data(new_token)
86
- return new_token
87
-
88
- def _get_access_token(self) -> str:
89
- token_data = self._read_token_data()
90
- if token_data:
91
- if self._token_is_expired(token_data):
92
- refresh = token_data.get("refresh_token") or self.config.env_refresh_token
93
- if not refresh:
94
- raise RuntimeError("Token expired and no refresh token is available.")
95
- token_data = self._refresh_token(refresh)
96
- token = token_data.get("access_token")
97
- if token:
98
- return token
99
-
100
- if self.config.env_refresh_token:
101
- token_data = self._refresh_token(self.config.env_refresh_token)
102
- token = token_data.get("access_token")
103
- if token:
104
- return token
105
-
106
- raise RuntimeError(
107
- "No auth session. Set SPOTIFY_REFRESH_TOKEN in env, or run spotify_auth_url and spotify_exchange_code."
108
- )
109
 
110
- def _request(self, method: str, path: str, **kwargs: Any) -> dict[str, Any]:
111
- headers = kwargs.pop("headers", {})
112
- headers["Authorization"] = f"Bearer {self._get_access_token()}"
113
- headers.setdefault("Content-Type", "application/json")
114
 
115
- response = requests.request(
116
- method,
117
- f"{SPOTIFY_API_BASE}{path}",
118
- headers=headers,
119
- timeout=30,
120
- **kwargs,
121
- )
122
- if response.status_code >= 400:
123
- raise RuntimeError(f"Spotify API error {response.status_code}: {response.text}")
124
- if response.status_code == 204:
125
- return {"ok": True}
126
- return response.json()
127
 
128
- def auth_url(self, state: str | None = None) -> dict[str, Any]:
129
- state_value = state or secrets.token_urlsafe(24)
130
- self._write_json(self.config.state_file, {"state": state_value, "saved_at": int(time.time())})
131
- url = (
132
- f"{SPOTIFY_ACCOUNTS_BASE}/authorize?"
133
- + urllib.parse.urlencode(
134
- {
135
- "client_id": self.config.client_id,
136
- "response_type": "code",
137
- "redirect_uri": self.config.redirect_uri,
138
- "scope": self.config.scopes,
139
- "state": state_value,
140
- "show_dialog": "true",
141
- }
142
- )
143
  )
144
- return {
145
- "auth_url": url,
146
- "state": state_value,
147
- "redirect_uri": self.config.redirect_uri,
148
- "next_step": "Open auth_url, approve app, then call spotify_exchange_code with returned code and state.",
149
- }
150
-
151
- def exchange_code(self, code: str, state: str | None = None) -> dict[str, Any]:
152
- state_data = self._read_json(self.config.state_file) or {}
153
- expected = state_data.get("state")
154
- if expected and state and state != expected:
155
- raise RuntimeError("OAuth state mismatch.")
156
-
157
- token_data = self._request_token(
158
- {
159
- "grant_type": "authorization_code",
160
- "code": code,
161
- "redirect_uri": self.config.redirect_uri,
162
- }
163
  )
164
- self._write_token_data(token_data)
165
- me = self.me()
166
- return {
167
- "status": "ok",
168
- "user_id": me.get("id"),
169
- "display_name": me.get("display_name"),
170
- "has_refresh_token": bool(token_data.get("refresh_token")),
171
- }
172
-
173
- def me(self) -> dict[str, Any]:
174
- me = self._request("GET", "/me")
175
- return {
176
- "id": me.get("id"),
177
- "display_name": me.get("display_name"),
178
- "email": me.get("email"),
179
- "product": me.get("product"),
180
- }
181
-
182
- def search_tracks(self, query: str, limit: int = 5) -> dict[str, Any]:
183
- payload = self._request(
184
- "GET",
185
- "/search",
186
- params={"q": query, "type": "track", "limit": max(1, min(limit, 50))},
187
  )
188
- items = payload.get("tracks", {}).get("items", [])
189
- return {
190
- "results": [
191
- {
192
- "id": t["id"],
193
- "name": t["name"],
194
- "artists": ", ".join(a["name"] for a in t.get("artists", [])),
195
- "uri": t["uri"],
196
- }
197
- for t in items
198
- ]
199
- }
200
-
201
- def create_playlist(self, name: str, description: str = "", public: bool = False) -> dict[str, Any]:
202
- playlist = self._request(
203
- "POST",
204
- "/me/playlists",
205
- json={"name": name, "description": description, "public": public},
206
  )
207
- return {
208
- "id": playlist.get("id"),
209
- "name": playlist.get("name"),
210
- "url": playlist.get("external_urls", {}).get("spotify"),
211
- }
212
-
213
- def add_tracks(self, playlist_id: str, track_ids: list[str]) -> dict[str, Any]:
214
- uris = [tid if tid.startswith("spotify:track:") else f"spotify:track:{tid}" for tid in track_ids]
215
- payload = self._request("POST", f"/playlists/{playlist_id}/items", json={"uris": uris})
216
- return {"snapshot_id": payload.get("snapshot_id"), "added": len(uris)}
217
-
218
-
219
- def load_config() -> AuthConfig:
220
- client_id = os.getenv("SPOTIFY_CLIENT_ID", "").strip()
221
- client_secret = os.getenv("SPOTIFY_CLIENT_SECRET", "").strip()
222
- redirect_uri = os.getenv("SPOTIFY_REDIRECT_URI", "").strip()
223
- scopes = os.getenv(
224
- "SPOTIFY_SCOPES",
225
- "playlist-modify-public playlist-modify-private user-read-private user-read-email",
226
- ).strip()
227
- token_file = Path(os.getenv("SPOTIFY_TOKEN_FILE", "spotify_tokens.json"))
228
- state_file = Path(os.getenv("SPOTIFY_STATE_FILE", "spotify_oauth_state.json"))
229
- env_refresh_token = os.getenv("SPOTIFY_REFRESH_TOKEN", "").strip() or None
230
-
231
- if not client_id or not client_secret or not redirect_uri:
232
- raise RuntimeError("SPOTIFY_CLIENT_ID, SPOTIFY_CLIENT_SECRET and SPOTIFY_REDIRECT_URI are required.")
233
-
234
- return AuthConfig(
235
- client_id=client_id,
236
- client_secret=client_secret,
237
- redirect_uri=redirect_uri,
238
- scopes=scopes,
239
- token_file=token_file,
240
- state_file=state_file,
241
- env_refresh_token=env_refresh_token,
242
- )
243
 
244
 
245
- _spotify: SpotifyClient | None = None
246
- _config_error: str | None = None
 
 
 
 
 
 
247
 
 
 
 
 
 
 
 
 
248
 
249
- def get_spotify() -> SpotifyClient:
250
- global _spotify, _config_error
251
- if _spotify is not None:
252
- return _spotify
253
- try:
254
- _spotify = SpotifyClient(load_config())
255
- _config_error = None
256
- return _spotify
257
- except Exception as exc:
258
- _config_error = str(exc)
259
- raise RuntimeError(
260
- "Spotify config missing/invalid. Set HF Space Secrets: "
261
- "SPOTIFY_CLIENT_ID, SPOTIFY_CLIENT_SECRET, SPOTIFY_REDIRECT_URI. "
262
- f"Details: {_config_error}"
263
- ) from exc
264
 
 
 
 
 
 
 
265
 
266
- def spotify_auth_url(state: str = "") -> dict[str, Any]:
267
- return get_spotify().auth_url(state=state or None)
268
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
269
 
270
- def spotify_exchange_code(code: str, state: str = "") -> dict[str, Any]:
271
- return get_spotify().exchange_code(code=code, state=state or None)
272
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
273
 
274
- def spotify_me() -> dict[str, Any]:
275
- return get_spotify().me()
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
276
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
277
 
278
- def search_tracks(query: str, limit: int = 5) -> dict[str, Any]:
279
- return get_spotify().search_tracks(query=query, limit=limit)
280
 
 
 
281
 
282
- def create_playlist(name: str, description: str = "", public: bool = False) -> dict[str, Any]:
283
- return get_spotify().create_playlist(name=name, description=description, public=public)
284
 
 
 
 
 
 
 
 
 
 
285
 
286
- def add_tracks_to_playlist(playlist_id: str, track_ids_csv: str) -> dict[str, Any]:
287
- track_ids = [x.strip() for x in track_ids_csv.split(",") if x.strip()]
288
- return get_spotify().add_tracks(playlist_id=playlist_id, track_ids=track_ids)
289
 
 
 
 
290
 
291
- def create_playlist_from_search(name: str, query: str, limit: int = 10, public: bool = False) -> dict[str, Any]:
292
- spotify = get_spotify()
293
- tracks = spotify.search_tracks(query=query, limit=limit).get("results", [])
294
- if not tracks:
295
- raise RuntimeError("No tracks found for query.")
296
- playlist = spotify.create_playlist(name=name, description=f"Auto playlist for query: {query}", public=public)
297
- added = spotify.add_tracks(playlist_id=playlist["id"], track_ids=[t["id"] for t in tracks])
298
- return {"playlist": playlist, "added": added, "tracks": tracks}
299
 
 
 
 
300
 
301
- auth_tab = gr.Interface(
302
- fn=spotify_auth_url,
303
- inputs=gr.Textbox(label="State (optional)", placeholder="optional_state"),
304
- outputs=gr.JSON(label="Spotify Auth URL"),
305
- title="Spotify Auth URL",
306
- api_name="_spotify_auth_url",
307
- )
308
 
309
- exchange_tab = gr.Interface(
310
- fn=spotify_exchange_code,
311
- inputs=[
312
- gr.Textbox(label="Authorization Code", placeholder="code from Spotify callback"),
313
- gr.Textbox(label="State (optional)", placeholder="state from spotify_auth_url"),
314
- ],
315
- outputs=gr.JSON(label="Exchange Result"),
316
- title="Spotify Exchange Code",
317
- api_name="_spotify_exchange_code",
318
- )
319
 
320
- me_tab = gr.Interface(
321
- fn=spotify_me,
322
- inputs=[],
323
- outputs=gr.JSON(label="Profile"),
324
- title="Spotify Me",
325
- api_name="_spotify_me",
326
- )
327
 
328
- search_tab = gr.Interface(
329
- fn=search_tracks,
330
- inputs=[gr.Textbox(label="Query"), gr.Slider(1, 50, value=5, step=1, label="Limit")],
331
- outputs=gr.JSON(label="Tracks"),
332
- title="Search Tracks",
333
- api_name="_search_tracks",
334
- )
 
335
 
336
- create_playlist_tab = gr.Interface(
337
- fn=create_playlist,
338
- inputs=[
339
- gr.Textbox(label="Playlist Name"),
340
- gr.Textbox(label="Description", placeholder="optional", lines=2),
341
- gr.Checkbox(label="Public", value=False),
342
- ],
343
- outputs=gr.JSON(label="Playlist"),
344
- title="Create Playlist",
345
- api_name="_create_playlist",
346
- )
347
 
348
- add_tracks_tab = gr.Interface(
349
- fn=add_tracks_to_playlist,
350
- inputs=[
351
- gr.Textbox(label="Playlist ID"),
352
- gr.Textbox(label="Track IDs CSV", placeholder="id1,id2,id3 or spotify:track:..."),
353
- ],
354
- outputs=gr.JSON(label="Add Tracks Result"),
355
- title="Add Tracks",
356
- api_name="_add_tracks_to_playlist",
357
- )
358
 
359
- auto_tab = gr.Interface(
360
- fn=create_playlist_from_search,
361
- inputs=[
362
- gr.Textbox(label="Playlist Name"),
363
- gr.Textbox(label="Search Query"),
364
- gr.Slider(1, 50, value=10, step=1, label="Limit"),
365
- gr.Checkbox(label="Public", value=False),
366
- ],
367
- outputs=gr.JSON(label="Result"),
368
- title="Create Playlist From Search",
369
- api_name="_create_playlist_from_search",
370
- )
 
 
 
371
 
372
- demo = gr.TabbedInterface(
373
- [auth_tab, exchange_tab, me_tab, search_tab, create_playlist_tab, add_tracks_tab, auto_tab],
374
- ["Auth URL", "Exchange Code", "Me", "Search", "Create Playlist", "Add Tracks", "Auto Playlist"],
375
- theme=gr.themes.Base(),
376
- )
377
 
378
  if __name__ == "__main__":
379
- demo.launch(mcp_server=True)
 
 
 
 
3
  import os
4
  import secrets
5
  import time
 
 
6
  from pathlib import Path
7
  from typing import Any
8
+ from urllib.parse import urlencode
9
 
10
+ import httpx
 
11
  from dotenv import load_dotenv
12
+ from fastapi import FastAPI, HTTPException, Query
13
+ from fastapi.responses import HTMLResponse, RedirectResponse
14
+ from mcp.server.fastmcp import FastMCP
15
 
16
  load_dotenv()
17
 
18
  SPOTIFY_ACCOUNTS_BASE = "https://accounts.spotify.com"
19
  SPOTIFY_API_BASE = "https://api.spotify.com/v1"
20
 
21
+ SPOTIFY_CLIENT_ID = os.getenv("SPOTIFY_CLIENT_ID", "").strip()
22
+ SPOTIFY_CLIENT_SECRET = os.getenv("SPOTIFY_CLIENT_SECRET", "").strip()
23
+ SPOTIFY_REDIRECT_URI = os.getenv("SPOTIFY_REDIRECT_URI", "").strip()
24
+ SPOTIFY_SCOPES = os.getenv(
25
+ "SPOTIFY_SCOPES",
26
+ "user-read-private user-read-email playlist-read-private "
27
+ "playlist-read-collaborative playlist-modify-private playlist-modify-public",
28
+ ).strip()
29
+ SPOTIFY_TOKEN_FILE = Path(os.getenv("SPOTIFY_TOKEN_FILE", "spotify_tokens.json")).resolve()
30
+ MCP_SSE_URL = os.getenv(
31
+ "MCP_PUBLIC_SSE_URL",
32
+ "https://pharmaia-demo-mcp-server-spotify.hf.space/gradio_api/mcp/sse",
33
+ ).strip()
34
+
35
+ # Keep OAuth states in memory for CSRF protection.
36
+ OAUTH_STATE_TTL_SECONDS = 600
37
+ _oauth_states: dict[str, int] = {}
38
+
39
+
40
+ class SpotifyAuthError(RuntimeError):
41
+ pass
42
+
43
+
44
+ def _require_spotify_config() -> None:
45
+ missing = []
46
+ if not SPOTIFY_CLIENT_ID:
47
+ missing.append("SPOTIFY_CLIENT_ID")
48
+ if not SPOTIFY_CLIENT_SECRET:
49
+ missing.append("SPOTIFY_CLIENT_SECRET")
50
+ if not SPOTIFY_REDIRECT_URI:
51
+ missing.append("SPOTIFY_REDIRECT_URI")
52
+
53
+ if missing:
54
+ raise SpotifyAuthError(
55
+ "Missing required environment variables: " + ", ".join(missing)
56
+ )
57
 
 
 
 
 
 
 
 
 
 
58
 
59
+ def _cleanup_expired_states() -> None:
60
+ now = int(time.time())
61
+ expired = [k for k, ts in _oauth_states.items() if (now - ts) > OAUTH_STATE_TTL_SECONDS]
62
+ for key in expired:
63
+ _oauth_states.pop(key, None)
64
 
 
 
 
65
 
66
+ def _new_oauth_state() -> str:
67
+ _cleanup_expired_states()
68
+ state = secrets.token_urlsafe(24)
69
+ _oauth_states[state] = int(time.time())
70
+ return state
71
+
72
+
73
+ def _consume_oauth_state(state: str) -> bool:
74
+ _cleanup_expired_states()
75
+ ts = _oauth_states.pop(state, None)
76
+ if ts is None:
77
+ return False
78
+ return (int(time.time()) - ts) <= OAUTH_STATE_TTL_SECONDS
79
+
80
+
81
+ def _spotify_basic_auth_header() -> str:
82
+ token = f"{SPOTIFY_CLIENT_ID}:{SPOTIFY_CLIENT_SECRET}".encode("utf-8")
83
+ return base64.b64encode(token).decode("utf-8")
84
+
85
+
86
+ def _load_tokens() -> dict[str, Any] | None:
87
+ if not SPOTIFY_TOKEN_FILE.exists():
88
+ return None
89
+ try:
90
+ return json.loads(SPOTIFY_TOKEN_FILE.read_text(encoding="utf-8"))
91
+ except (json.JSONDecodeError, OSError):
92
+ return None
93
+
94
+
95
+ def _save_tokens(token_payload: dict[str, Any], previous: dict[str, Any] | None = None) -> dict[str, Any]:
96
+ now = int(time.time())
97
+ expires_in = int(token_payload.get("expires_in", 3600))
98
+
99
+ merged: dict[str, Any] = {}
100
+ if previous:
101
+ merged.update(previous)
102
+ merged.update(token_payload)
103
+
104
+ if not merged.get("refresh_token") and previous:
105
+ merged["refresh_token"] = previous.get("refresh_token")
106
+
107
+ merged["saved_at"] = now
108
+ merged["expires_at"] = now + expires_in
109
+
110
+ SPOTIFY_TOKEN_FILE.write_text(
111
+ json.dumps(merged, indent=2, ensure_ascii=True),
112
+ encoding="utf-8",
113
+ )
114
+ return merged
115
 
 
 
116
 
117
+ def _is_expired(tokens: dict[str, Any]) -> bool:
118
+ expires_at = int(tokens.get("expires_at", 0))
119
+ return int(time.time()) >= (expires_at - 30)
120
 
 
 
 
121
 
122
+ def _exchange_code_for_tokens(code: str) -> dict[str, Any]:
123
+ _require_spotify_config()
 
124
 
125
+ data = {
126
+ "grant_type": "authorization_code",
127
+ "code": code,
128
+ "redirect_uri": SPOTIFY_REDIRECT_URI,
129
+ }
130
+ headers = {
131
+ "Authorization": f"Basic {_spotify_basic_auth_header()}",
132
+ "Content-Type": "application/x-www-form-urlencoded",
133
+ }
134
 
135
+ with httpx.Client(timeout=20.0) as client:
136
+ response = client.post(
137
  f"{SPOTIFY_ACCOUNTS_BASE}/api/token",
 
 
 
 
138
  data=data,
139
+ headers=headers,
140
  )
 
 
141
 
142
+ if response.status_code != 200:
143
+ try:
144
+ payload = response.json()
145
+ except ValueError:
146
+ payload = {"error": response.text}
147
+ raise SpotifyAuthError(f"Failed token exchange: {payload}")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
148
 
149
+ return _save_tokens(response.json())
 
 
 
150
 
 
 
 
 
 
 
 
 
 
 
 
 
151
 
152
+ def _refresh_access_token(tokens: dict[str, Any]) -> dict[str, Any]:
153
+ refresh_token = tokens.get("refresh_token")
154
+ if not refresh_token:
155
+ raise SpotifyAuthError(
156
+ "No refresh token found. Re-authenticate at /auth/login."
 
 
 
 
 
 
 
 
 
 
157
  )
158
+
159
+ data = {
160
+ "grant_type": "refresh_token",
161
+ "refresh_token": refresh_token,
162
+ }
163
+ headers = {
164
+ "Authorization": f"Basic {_spotify_basic_auth_header()}",
165
+ "Content-Type": "application/x-www-form-urlencoded",
166
+ }
167
+
168
+ with httpx.Client(timeout=20.0) as client:
169
+ response = client.post(
170
+ f"{SPOTIFY_ACCOUNTS_BASE}/api/token",
171
+ data=data,
172
+ headers=headers,
 
 
 
 
173
  )
174
+
175
+ if response.status_code != 200:
176
+ try:
177
+ payload = response.json()
178
+ except ValueError:
179
+ payload = {"error": response.text}
180
+ raise SpotifyAuthError(f"Failed token refresh: {payload}")
181
+
182
+ return _save_tokens(response.json(), previous=tokens)
183
+
184
+
185
+ def _get_valid_access_token() -> str:
186
+ _require_spotify_config()
187
+
188
+ tokens = _load_tokens()
189
+ if not tokens:
190
+ raise SpotifyAuthError(
191
+ "Spotify is not authenticated yet. Open /auth/login first."
 
 
 
 
 
192
  )
193
+
194
+ if _is_expired(tokens):
195
+ tokens = _refresh_access_token(tokens)
196
+
197
+ access_token = tokens.get("access_token")
198
+ if not access_token:
199
+ raise SpotifyAuthError(
200
+ "Access token missing. Re-authenticate at /auth/login."
 
 
 
 
 
 
 
 
 
 
201
  )
202
+ return str(access_token)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
203
 
204
 
205
+ def _spotify_request(
206
+ method: str,
207
+ path: str,
208
+ params: dict[str, Any] | None = None,
209
+ json_body: dict[str, Any] | None = None,
210
+ ) -> dict[str, Any]:
211
+ token = _get_valid_access_token()
212
+ headers = {"Authorization": f"Bearer {token}"}
213
 
214
+ with httpx.Client(timeout=20.0) as client:
215
+ response = client.request(
216
+ method,
217
+ f"{SPOTIFY_API_BASE}{path}",
218
+ params=params,
219
+ json=json_body,
220
+ headers=headers,
221
+ )
222
 
223
+ if response.status_code == 401:
224
+ refreshed = _refresh_access_token(_load_tokens() or {})
225
+ headers["Authorization"] = f"Bearer {refreshed.get('access_token', '')}"
226
+ response = client.request(
227
+ method,
228
+ f"{SPOTIFY_API_BASE}{path}",
229
+ params=params,
230
+ json=json_body,
231
+ headers=headers,
232
+ )
 
 
 
 
 
233
 
234
+ if response.status_code >= 400:
235
+ try:
236
+ payload = response.json()
237
+ except ValueError:
238
+ payload = {"error": response.text}
239
+ raise SpotifyAuthError(f"Spotify API error {response.status_code}: {payload}")
240
 
241
+ if not response.content:
242
+ return {}
243
 
244
+ try:
245
+ return response.json()
246
+ except ValueError:
247
+ return {"raw": response.text}
248
+
249
+
250
+ def _build_auth_url(state: str) -> str:
251
+ _require_spotify_config()
252
+ params = {
253
+ "response_type": "code",
254
+ "client_id": SPOTIFY_CLIENT_ID,
255
+ "scope": SPOTIFY_SCOPES,
256
+ "redirect_uri": SPOTIFY_REDIRECT_URI,
257
+ "state": state,
258
+ "show_dialog": "false",
259
+ }
260
+ return f"{SPOTIFY_ACCOUNTS_BASE}/authorize?{urlencode(params)}"
261
+
262
+
263
+ def _token_status() -> dict[str, Any]:
264
+ tokens = _load_tokens() or {}
265
+ authenticated = bool(tokens.get("access_token"))
266
+ return {
267
+ "authenticated": authenticated,
268
+ "token_file": str(SPOTIFY_TOKEN_FILE),
269
+ "expires_at": tokens.get("expires_at"),
270
+ "scopes": tokens.get("scope"),
271
+ }
272
+
273
+
274
+ def _short_playlist(playlist: dict[str, Any]) -> dict[str, Any]:
275
+ return {
276
+ "id": playlist.get("id"),
277
+ "name": playlist.get("name"),
278
+ "uri": playlist.get("uri"),
279
+ "public": playlist.get("public"),
280
+ "owner": (playlist.get("owner") or {}).get("id"),
281
+ "total_items": (playlist.get("tracks") or {}).get("total"),
282
+ "external_url": (playlist.get("external_urls") or {}).get("spotify"),
283
+ }
284
+
285
+
286
+ def _short_track(track: dict[str, Any]) -> dict[str, Any]:
287
+ return {
288
+ "id": track.get("id"),
289
+ "name": track.get("name"),
290
+ "uri": track.get("uri"),
291
+ "artists": [a.get("name") for a in track.get("artists", [])],
292
+ "album": (track.get("album") or {}).get("name"),
293
+ "external_url": (track.get("external_urls") or {}).get("spotify"),
294
+ }
295
+
296
+
297
+ mcp = FastMCP(
298
+ name="Spotify MCP Server",
299
+ instructions=(
300
+ "MCP server to query Spotify profile, search tracks, and manage playlists. "
301
+ "If not authenticated, user must open /auth/login first."
302
+ ),
303
+ )
304
 
 
 
305
 
306
+ @mcp.tool()
307
+ def spotify_auth_status() -> dict[str, Any]:
308
+ """Return whether the server already has a valid Spotify token."""
309
+ return _token_status()
310
+
311
+
312
+ @mcp.tool()
313
+ def spotify_get_auth_url() -> dict[str, str]:
314
+ """Generate the Spotify OAuth URL. Open it in a browser to connect the account."""
315
+ state = _new_oauth_state()
316
+ return {
317
+ "auth_url": _build_auth_url(state),
318
+ "callback": SPOTIFY_REDIRECT_URI,
319
+ "note": "Open auth_url in browser and finish login/consent.",
320
+ }
321
+
322
+
323
+ @mcp.tool()
324
+ def spotify_get_my_profile() -> dict[str, Any]:
325
+ """Get current Spotify user profile (/me)."""
326
+ me = _spotify_request("GET", "/me")
327
+ return {
328
+ "id": me.get("id"),
329
+ "display_name": me.get("display_name"),
330
+ "uri": me.get("uri"),
331
+ "country": me.get("country"),
332
+ "product": me.get("product"),
333
+ }
334
+
335
+
336
+ @mcp.tool()
337
+ def spotify_search_tracks(query: str, limit: int = 5, offset: int = 0) -> dict[str, Any]:
338
+ """Search tracks using Spotify /search endpoint."""
339
+ safe_limit = max(1, min(limit, 10))
340
+ safe_offset = max(0, offset)
341
+
342
+ payload = _spotify_request(
343
+ "GET",
344
+ "/search",
345
+ params={
346
+ "q": query,
347
+ "type": "track",
348
+ "limit": safe_limit,
349
+ "offset": safe_offset,
350
+ },
351
+ )
352
+ tracks = payload.get("tracks") or {}
353
+ items = [_short_track(t) for t in tracks.get("items", [])]
354
+
355
+ return {
356
+ "query": query,
357
+ "limit": safe_limit,
358
+ "offset": safe_offset,
359
+ "total": tracks.get("total", 0),
360
+ "items": items,
361
+ }
362
+
363
+
364
+ @mcp.tool()
365
+ def spotify_list_my_playlists(limit: int = 10, offset: int = 0) -> dict[str, Any]:
366
+ """List playlists from current user (/me/playlists)."""
367
+ safe_limit = max(1, min(limit, 50))
368
+ safe_offset = max(0, offset)
369
+
370
+ payload = _spotify_request(
371
+ "GET",
372
+ "/me/playlists",
373
+ params={"limit": safe_limit, "offset": safe_offset},
374
+ )
375
 
376
+ return {
377
+ "limit": safe_limit,
378
+ "offset": safe_offset,
379
+ "total": payload.get("total", 0),
380
+ "items": [_short_playlist(p) for p in payload.get("items", [])],
381
+ }
382
+
383
+
384
+ @mcp.tool()
385
+ def spotify_create_playlist(
386
+ name: str,
387
+ description: str = "",
388
+ public: bool = False,
389
+ ) -> dict[str, Any]:
390
+ """Create a playlist in the current user's account."""
391
+ if not name.strip():
392
+ raise SpotifyAuthError("Playlist name cannot be empty.")
393
+
394
+ me = _spotify_request("GET", "/me")
395
+ user_id = str(me.get("id", "")).strip()
396
+ if not user_id:
397
+ raise SpotifyAuthError("Could not resolve current Spotify user id.")
398
+
399
+ payload = _spotify_request(
400
+ "POST",
401
+ f"/users/{user_id}/playlists",
402
+ json_body={
403
+ "name": name.strip(),
404
+ "description": description,
405
+ "public": public,
406
+ },
407
+ )
408
 
409
+ return {
410
+ "id": payload.get("id"),
411
+ "name": payload.get("name"),
412
+ "uri": payload.get("uri"),
413
+ "public": payload.get("public"),
414
+ "external_url": (payload.get("external_urls") or {}).get("spotify"),
415
+ }
416
+
417
+
418
+ @mcp.tool()
419
+ def spotify_add_items_to_playlist(playlist_id: str, uris: list[str]) -> dict[str, Any]:
420
+ """Add Spotify item URIs to a playlist (/playlists/{id}/items)."""
421
+ clean_uris = [u.strip() for u in uris if u and u.strip()]
422
+
423
+ if not playlist_id.strip():
424
+ raise SpotifyAuthError("playlist_id is required.")
425
+ if not clean_uris:
426
+ raise SpotifyAuthError("Provide at least one URI.")
427
+ if len(clean_uris) > 100:
428
+ raise SpotifyAuthError("Spotify allows up to 100 URIs per request.")
429
+
430
+ payload = _spotify_request(
431
+ "POST",
432
+ f"/playlists/{playlist_id.strip()}/items",
433
+ json_body={"uris": clean_uris},
434
+ )
435
+ return {
436
+ "playlist_id": playlist_id.strip(),
437
+ "added": len(clean_uris),
438
+ "snapshot_id": payload.get("snapshot_id"),
439
+ }
440
 
 
 
441
 
442
+ app = FastAPI(title="Spotify MCP Server", version="1.0.0")
443
+ app.mount("/gradio_api/mcp", mcp.sse_app("/gradio_api/mcp"))
444
 
 
 
445
 
446
+ @app.get("/")
447
+ def root() -> dict[str, Any]:
448
+ return {
449
+ "service": "spotify-mcp-server",
450
+ "auth_login_url": "/auth/login",
451
+ "auth_status_url": "/auth/status",
452
+ "mcp_sse_path": "/gradio_api/mcp/sse",
453
+ "public_mcp_sse_url": MCP_SSE_URL,
454
+ }
455
 
 
 
 
456
 
457
+ @app.get("/health")
458
+ def health() -> dict[str, str]:
459
+ return {"status": "ok"}
460
 
 
 
 
 
 
 
 
 
461
 
462
+ @app.get("/auth/status")
463
+ def auth_status() -> dict[str, Any]:
464
+ return _token_status()
465
 
 
 
 
 
 
 
 
466
 
467
+ @app.get("/auth/login")
468
+ def auth_login() -> RedirectResponse:
469
+ try:
470
+ state = _new_oauth_state()
471
+ url = _build_auth_url(state)
472
+ except SpotifyAuthError as exc:
473
+ raise HTTPException(status_code=500, detail=str(exc)) from exc
474
+ return RedirectResponse(url=url, status_code=307)
 
 
475
 
 
 
 
 
 
 
 
476
 
477
+ @app.get("/auth/callback", response_class=HTMLResponse)
478
+ def auth_callback(
479
+ code: str | None = Query(default=None),
480
+ state: str | None = Query(default=None),
481
+ error: str | None = Query(default=None),
482
+ ) -> HTMLResponse:
483
+ if error:
484
+ raise HTTPException(status_code=400, detail=f"Spotify authorization error: {error}")
485
 
486
+ if not code or not state:
487
+ raise HTTPException(status_code=400, detail="Missing code/state in callback.")
 
 
 
 
 
 
 
 
 
488
 
489
+ if not _consume_oauth_state(state):
490
+ raise HTTPException(status_code=400, detail="Invalid or expired OAuth state.")
 
 
 
 
 
 
 
 
491
 
492
+ try:
493
+ _exchange_code_for_tokens(code)
494
+ except SpotifyAuthError as exc:
495
+ raise HTTPException(status_code=400, detail=str(exc)) from exc
496
+
497
+ html = """
498
+ <html>
499
+ <head><title>Spotify conectado</title></head>
500
+ <body style='font-family: Arial, sans-serif; padding: 24px;'>
501
+ <h2>Spotify conectado correctamente</h2>
502
+ <p>Ya puedes volver a tu cliente MCP y usar las tools.</p>
503
+ </body>
504
+ </html>
505
+ """
506
+ return HTMLResponse(content=html, status_code=200)
507
 
 
 
 
 
 
508
 
509
  if __name__ == "__main__":
510
+ import uvicorn
511
+
512
+ port = int(os.getenv("PORT", "7860"))
513
+ uvicorn.run("app:app", host="0.0.0.0", port=port)