pharmaia commited on
Commit
4711709
·
verified ·
1 Parent(s): f091267

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +209 -3
app.py CHANGED
@@ -21,11 +21,29 @@ SPOTIFY_API_BASE = "https://api.spotify.com/v1"
21
  SPOTIFY_CLIENT_ID = os.getenv("SPOTIFY_CLIENT_ID", "").strip()
22
  SPOTIFY_CLIENT_SECRET = os.getenv("SPOTIFY_CLIENT_SECRET", "").strip()
23
  SPOTIFY_REDIRECT_URI = os.getenv("SPOTIFY_REDIRECT_URI", "").strip()
24
- SPOTIFY_SCOPES = os.getenv(
25
  "SPOTIFY_SCOPES",
26
  "user-read-private user-read-email playlist-read-private "
27
- "playlist-read-collaborative playlist-modify-private playlist-modify-public",
 
28
  ).strip()
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
29
  SPOTIFY_TOKEN_FILE = Path(os.getenv("SPOTIFY_TOKEN_FILE", "spotify_tokens.json")).resolve()
30
  MCP_SSE_URL = os.getenv(
31
  "MCP_PUBLIC_SSE_URL",
@@ -294,10 +312,47 @@ def _short_track(track: dict[str, Any]) -> dict[str, Any]:
294
  }
295
 
296
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
297
  mcp = FastMCP(
298
  name="Spotify MCP Server",
299
  instructions=(
300
- "MCP server to query Spotify profile, search tracks, and manage playlists. "
 
301
  "If not authenticated, user must open /auth/login first."
302
  ),
303
  )
@@ -361,6 +416,157 @@ def spotify_search_tracks(query: str, limit: int = 5, offset: int = 0) -> dict[s
361
  }
362
 
363
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
364
  @mcp.tool()
365
  def spotify_list_my_playlists(limit: int = 10, offset: int = 0) -> dict[str, Any]:
366
  """List playlists from current user (/me/playlists)."""
 
21
  SPOTIFY_CLIENT_ID = os.getenv("SPOTIFY_CLIENT_ID", "").strip()
22
  SPOTIFY_CLIENT_SECRET = os.getenv("SPOTIFY_CLIENT_SECRET", "").strip()
23
  SPOTIFY_REDIRECT_URI = os.getenv("SPOTIFY_REDIRECT_URI", "").strip()
24
+ _SPOTIFY_SCOPES_RAW = os.getenv(
25
  "SPOTIFY_SCOPES",
26
  "user-read-private user-read-email playlist-read-private "
27
+ "playlist-read-collaborative playlist-modify-private playlist-modify-public "
28
+ "user-library-read user-read-recently-played",
29
  ).strip()
30
+ _REQUIRED_SCOPES = [
31
+ "user-read-private",
32
+ "user-read-email",
33
+ "playlist-read-private",
34
+ "playlist-read-collaborative",
35
+ "playlist-modify-private",
36
+ "playlist-modify-public",
37
+ "user-library-read",
38
+ "user-library-modify",
39
+ "user-read-recently-played",
40
+ "user-top-read",
41
+ ]
42
+ _scope_parts = _SPOTIFY_SCOPES_RAW.split()
43
+ for _required_scope in _REQUIRED_SCOPES:
44
+ if _required_scope not in _scope_parts:
45
+ _scope_parts.append(_required_scope)
46
+ SPOTIFY_SCOPES = " ".join(_scope_parts)
47
  SPOTIFY_TOKEN_FILE = Path(os.getenv("SPOTIFY_TOKEN_FILE", "spotify_tokens.json")).resolve()
48
  MCP_SSE_URL = os.getenv(
49
  "MCP_PUBLIC_SSE_URL",
 
312
  }
313
 
314
 
315
+ def _normalize_track_id(track_ref: str) -> str:
316
+ value = track_ref.strip()
317
+ if not value:
318
+ return ""
319
+
320
+ if value.startswith("spotify:track:"):
321
+ return value.split(":")[-1].strip()
322
+
323
+ if "open.spotify.com/track/" in value:
324
+ return value.split("open.spotify.com/track/")[-1].split("?")[0].strip().strip("/")
325
+
326
+ return value
327
+
328
+
329
+ def _normalize_track_ids(track_ids: list[str], max_items: int = 500) -> list[str]:
330
+ normalized: list[str] = []
331
+ seen: set[str] = set()
332
+
333
+ for raw in track_ids:
334
+ item = _normalize_track_id(raw)
335
+ if item and item not in seen:
336
+ normalized.append(item)
337
+ seen.add(item)
338
+
339
+ if not normalized:
340
+ raise SpotifyAuthError("Provide at least one valid track id or track uri.")
341
+ if len(normalized) > max_items:
342
+ raise SpotifyAuthError(f"Too many track ids. Maximum allowed is {max_items}.")
343
+
344
+ return normalized
345
+
346
+
347
+ def _chunked(items: list[str], size: int) -> list[list[str]]:
348
+ return [items[i : i + size] for i in range(0, len(items), size)]
349
+
350
+
351
  mcp = FastMCP(
352
  name="Spotify MCP Server",
353
  instructions=(
354
+ "MCP server to query Spotify profile, search tracks, top tracks, library, "
355
+ "listening history, and manage playlists. "
356
  "If not authenticated, user must open /auth/login first."
357
  ),
358
  )
 
416
  }
417
 
418
 
419
+ @mcp.tool()
420
+ def spotify_list_saved_tracks(limit: int = 20, offset: int = 0) -> dict[str, Any]:
421
+ """List user's saved (liked) tracks from library (/me/tracks)."""
422
+ safe_limit = max(1, min(limit, 50))
423
+ safe_offset = max(0, offset)
424
+
425
+ payload = _spotify_request(
426
+ "GET",
427
+ "/me/tracks",
428
+ params={"limit": safe_limit, "offset": safe_offset},
429
+ )
430
+
431
+ items: list[dict[str, Any]] = []
432
+ for item in payload.get("items", []):
433
+ track = _short_track(item.get("track") or {})
434
+ track["added_at"] = item.get("added_at")
435
+ items.append(track)
436
+
437
+ return {
438
+ "limit": safe_limit,
439
+ "offset": safe_offset,
440
+ "total": payload.get("total", 0),
441
+ "items": items,
442
+ }
443
+
444
+
445
+ @mcp.tool()
446
+ def spotify_list_recently_played(limit: int = 20) -> dict[str, Any]:
447
+ """List recently played tracks (/me/player/recently-played)."""
448
+ safe_limit = max(1, min(limit, 50))
449
+
450
+ payload = _spotify_request(
451
+ "GET",
452
+ "/me/player/recently-played",
453
+ params={"limit": safe_limit},
454
+ )
455
+
456
+ items: list[dict[str, Any]] = []
457
+ for item in payload.get("items", []):
458
+ track = _short_track(item.get("track") or {})
459
+ track["played_at"] = item.get("played_at")
460
+ context = item.get("context") or {}
461
+ track["context_type"] = context.get("type")
462
+ track["context_uri"] = context.get("uri")
463
+ items.append(track)
464
+
465
+ return {
466
+ "limit": safe_limit,
467
+ "items": items,
468
+ "cursors": payload.get("cursors"),
469
+ }
470
+
471
+
472
+ @mcp.tool()
473
+ def spotify_get_top_tracks(
474
+ time_range: str = "medium_term",
475
+ limit: int = 20,
476
+ offset: int = 0,
477
+ ) -> dict[str, Any]:
478
+ """Get user top tracks (/me/top/tracks)."""
479
+ allowed_ranges = {"short_term", "medium_term", "long_term"}
480
+ safe_range = time_range if time_range in allowed_ranges else "medium_term"
481
+ safe_limit = max(1, min(limit, 50))
482
+ safe_offset = max(0, offset)
483
+
484
+ payload = _spotify_request(
485
+ "GET",
486
+ "/me/top/tracks",
487
+ params={
488
+ "time_range": safe_range,
489
+ "limit": safe_limit,
490
+ "offset": safe_offset,
491
+ },
492
+ )
493
+
494
+ return {
495
+ "time_range": safe_range,
496
+ "limit": safe_limit,
497
+ "offset": safe_offset,
498
+ "total": payload.get("total", 0),
499
+ "items": [_short_track(t) for t in payload.get("items", [])],
500
+ }
501
+
502
+
503
+ @mcp.tool()
504
+ def spotify_check_saved_tracks(track_ids: list[str]) -> dict[str, Any]:
505
+ """Check if tracks are saved in user's library (/me/tracks/contains)."""
506
+ ids = _normalize_track_ids(track_ids)
507
+ chunks = _chunked(ids, 50)
508
+ states: list[bool] = []
509
+
510
+ for chunk in chunks:
511
+ payload = _spotify_request(
512
+ "GET",
513
+ "/me/tracks/contains",
514
+ params={"ids": ",".join(chunk)},
515
+ )
516
+ if isinstance(payload, list):
517
+ states.extend(bool(x) for x in payload)
518
+ else:
519
+ raise SpotifyAuthError("Unexpected response for /me/tracks/contains.")
520
+
521
+ items = [{"id": track_id, "saved": states[idx]} for idx, track_id in enumerate(ids)]
522
+ return {"total": len(ids), "items": items}
523
+
524
+
525
+ @mcp.tool()
526
+ def spotify_save_tracks(track_ids: list[str]) -> dict[str, Any]:
527
+ """Save tracks to user library (PUT /me/tracks)."""
528
+ ids = _normalize_track_ids(track_ids)
529
+ chunks = _chunked(ids, 50)
530
+
531
+ for chunk in chunks:
532
+ _spotify_request(
533
+ "PUT",
534
+ "/me/tracks",
535
+ params={"ids": ",".join(chunk)},
536
+ )
537
+
538
+ return {"saved": len(ids), "track_ids": ids}
539
+
540
+
541
+ @mcp.tool()
542
+ def spotify_remove_saved_tracks(track_ids: list[str]) -> dict[str, Any]:
543
+ """Remove tracks from user library (DELETE /me/tracks)."""
544
+ ids = _normalize_track_ids(track_ids)
545
+ chunks = _chunked(ids, 50)
546
+
547
+ for chunk in chunks:
548
+ _spotify_request(
549
+ "DELETE",
550
+ "/me/tracks",
551
+ params={"ids": ",".join(chunk)},
552
+ )
553
+
554
+ return {"removed": len(ids), "track_ids": ids}
555
+
556
+
557
+ @mcp.tool()
558
+ def spotify_set_tracks_saved(track_ids: list[str], saved: bool) -> dict[str, Any]:
559
+ """Set saved state for tracks in user library (alta/baja)."""
560
+ if saved:
561
+ result = spotify_save_tracks(track_ids)
562
+ result["action"] = "saved"
563
+ return result
564
+
565
+ result = spotify_remove_saved_tracks(track_ids)
566
+ result["action"] = "removed"
567
+ return result
568
+
569
+
570
  @mcp.tool()
571
  def spotify_list_my_playlists(limit: int = 10, offset: int = 0) -> dict[str, Any]:
572
  """List playlists from current user (/me/playlists)."""