Vicoka / app /main.py
nexusbert's picture
push
796681c
from io import BytesIO
from fastapi import Depends, FastAPI, File, HTTPException, UploadFile, Header
from fastapi.responses import RedirectResponse
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
from PIL import Image
from sqlalchemy.orm import Session
from sqlalchemy.exc import SQLAlchemyError
from .db import Base, engine, get_db
from .models import MoodLog, RecommendationLog
from .mood_text_model import analyze_text_emotion
from .mood_face_model import analyze_face_emotion
from .spotify_client import recommend_track_for_emotion, get_track_via_bearer, get_available_genre_seeds, get_app_access_token
from .spotify_user_api import get_current_user_profile
from .spotify_user_reco import recommend_user_track_for_emotion
from .spotify_http import SpotifyAPIError
from .spotify_oauth import (
build_authorize_url,
default_redirect_uri,
exchange_code_for_token,
exchange_code_for_token_pkce,
generate_state,
refresh_access_token,
validate_redirect_uri,
)
from .spotify_playlists_api import (
add_items_to_playlist,
create_playlist,
get_playlist,
get_playlist_items,
list_current_user_playlists,
)
from .spotify_ids import parse_spotify_ref, to_track_uri
from .spotify_catalog_api import (
get_artist,
get_album_tracks,
get_all_artist_tracks_via_albums,
get_artist_albums,
search_tracks_by_artist,
get_artist_top_tracks,
)
from .spotify_playlist_reco import recommend_playlist_for_emotion, pick_random_track_from_playlist
from .spotify_player_api import (
add_to_queue,
get_available_devices,
get_currently_playing,
get_playback_state,
pause_playback,
skip_next,
skip_previous,
start_or_resume_playback,
transfer_playback,
)
Base.metadata.create_all(bind=engine)
app = FastAPI(title="VibeCheck – Mood-based Spotify Recommender")
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
class TextMoodRequest(BaseModel):
text: str
class RecommendationResponse(BaseModel):
mood_label: str
mood_score: float
spotify_track_id: str | None
track_name: str | None
artists: list[str] | None
preview_url: str | None
external_url: str | None
spotify_playlist_id: str | None = None
playlist_name: str | None = None
playlist_owner: str | None = None
playlist_image_url: str | None = None
playlist_external_url: str | None = None
class SpotifyMobileTokenRequest(BaseModel):
code: str
code_verifier: str
redirect_uri: str
class SpotifyMobileRefreshRequest(BaseModel):
refresh_token: str
def _extract_bearer_token(authorization: str | None) -> str:
if not authorization:
raise HTTPException(
status_code=401,
detail="Missing Authorization header. Use: Authorization: Bearer <access_token>",
)
if not authorization.lower().startswith("bearer "):
raise HTTPException(
status_code=401,
detail="Invalid Authorization header format. Use: Bearer <access_token>",
)
return authorization.split(" ", 1)[1].strip()
@app.post("/mood/text", response_model=RecommendationResponse)
def mood_from_text(
body: TextMoodRequest, db: Session = Depends(get_db)
) -> RecommendationResponse:
if not body.text.strip():
raise HTTPException(status_code=400, detail="Text cannot be empty")
label, score = analyze_text_emotion(body.text)
try:
playlist = recommend_playlist_for_emotion(label)
except SpotifyAPIError:
playlist = {}
track: dict = {}
track_id: str | None = None
try:
if playlist:
track = pick_random_track_from_playlist(
playlist_id=playlist.get("id"),
)
track_id = track.get("id") if track else None
except SpotifyAPIError:
track = {}
track_id = None
if not track_id:
track = recommend_track_for_emotion(label, source="text") or {}
track_id = track.get("id") if track else None
playlist_id = playlist.get("id") if playlist else None
try:
mood_log = MoodLog(
source="text",
raw_input=body.text[:512],
emotion_label=label,
emotion_score=score,
)
db.add(mood_log)
db.flush()
if track_id:
rec_log = RecommendationLog(
user_id=None,
mood_id=mood_log.id,
spotify_track_id=track_id,
)
db.add(rec_log)
db.commit()
except SQLAlchemyError:
db.rollback()
return RecommendationResponse(
mood_label=label,
mood_score=score,
spotify_track_id=track_id,
track_name=track.get("name") if track else None,
artists=track.get("artists") if track else None,
preview_url=track.get("preview_url") if track else None,
external_url=track.get("external_url") if track else None,
spotify_playlist_id=playlist_id,
playlist_name=playlist.get("name") if playlist else None,
playlist_owner=playlist.get("owner") if playlist else None,
playlist_image_url=playlist.get("image_url") if playlist else None,
playlist_external_url=playlist.get("external_url") if playlist else None,
)
@app.post("/mood/face", response_model=RecommendationResponse)
async def mood_from_face(
file: UploadFile = File(...), db: Session = Depends(get_db)
) -> RecommendationResponse:
contents = await file.read()
try:
image = Image.open(BytesIO(contents)).convert("RGB")
except Exception as exc:
raise HTTPException(status_code=400, detail="Invalid image file") from exc
label, score = analyze_face_emotion(image)
try:
playlist = recommend_playlist_for_emotion(label)
except SpotifyAPIError:
playlist = {}
track: dict = {}
track_id: str | None = None
try:
if playlist:
track = pick_random_track_from_playlist(
playlist_id=playlist.get("id"),
)
track_id = track.get("id") if track else None
except SpotifyAPIError:
track = {}
track_id = None
if not track_id:
track = recommend_track_for_emotion(label, source="face") or {}
track_id = track.get("id") if track else None
playlist_id = playlist.get("id") if playlist else None
try:
mood_log = MoodLog(
source="face",
raw_input=None,
emotion_label=label,
emotion_score=score,
)
db.add(mood_log)
db.flush()
if track_id:
rec_log = RecommendationLog(
user_id=None,
mood_id=mood_log.id,
spotify_track_id=track_id,
)
db.add(rec_log)
db.commit()
except SQLAlchemyError:
db.rollback()
return RecommendationResponse(
mood_label=label,
mood_score=score,
spotify_track_id=track_id,
track_name=track.get("name") if track else None,
artists=track.get("artists") if track else None,
preview_url=track.get("preview_url") if track else None,
external_url=track.get("external_url") if track else None,
spotify_playlist_id=playlist_id,
playlist_name=playlist.get("name") if playlist else None,
playlist_owner=playlist.get("owner") if playlist else None,
playlist_image_url=playlist.get("image_url") if playlist else None,
playlist_external_url=playlist.get("external_url") if playlist else None,
)
@app.post("/mood/text/user", response_model=RecommendationResponse)
def mood_from_text_user(
body: TextMoodRequest,
authorization: str | None = Header(default=None),
db: Session = Depends(get_db),
) -> RecommendationResponse:
if not body.text.strip():
raise HTTPException(status_code=400, detail="Text cannot be empty")
token = _extract_bearer_token(authorization)
label, score = analyze_text_emotion(body.text)
try:
track = recommend_user_track_for_emotion(token, label)
except SpotifyAPIError:
track = recommend_track_for_emotion(label, source="user") or {}
track_id: str | None = track.get("id") if track else None
try:
mood_log = MoodLog(
source="text_user",
raw_input=body.text[:512],
emotion_label=label,
emotion_score=score,
)
db.add(mood_log)
db.flush()
if track_id:
rec_log = RecommendationLog(
user_id=None,
mood_id=mood_log.id,
spotify_track_id=track_id,
)
db.add(rec_log)
db.commit()
except SQLAlchemyError:
db.rollback()
return RecommendationResponse(
mood_label=label,
mood_score=score,
spotify_track_id=track_id,
track_name=track.get("name") if track else None,
artists=track.get("artists") if track else None,
preview_url=track.get("preview_url") if track else None,
external_url=track.get("external_url") if track else None,
spotify_playlist_id=None,
playlist_name=None,
playlist_owner=None,
playlist_image_url=None,
playlist_external_url=None,
)
@app.post("/mood/face/user", response_model=RecommendationResponse)
async def mood_from_face_user(
authorization: str | None = Header(default=None),
file: UploadFile = File(...),
db: Session = Depends(get_db),
) -> RecommendationResponse:
token = _extract_bearer_token(authorization)
contents = await file.read()
try:
image = Image.open(BytesIO(contents)).convert("RGB")
except Exception as exc:
raise HTTPException(status_code=400, detail="Invalid image file") from exc
label, score = analyze_face_emotion(image)
try:
track = recommend_user_track_for_emotion(token, label)
except SpotifyAPIError:
track = recommend_track_for_emotion(label, source="user") or {}
track_id: str | None = track.get("id") if track else None
try:
mood_log = MoodLog(
source="face_user",
raw_input=None,
emotion_label=label,
emotion_score=score,
)
db.add(mood_log)
db.flush()
if track_id:
rec_log = RecommendationLog(
user_id=None,
mood_id=mood_log.id,
spotify_track_id=track_id,
)
db.add(rec_log)
db.commit()
except SQLAlchemyError:
db.rollback()
return RecommendationResponse(
mood_label=label,
mood_score=score,
spotify_track_id=track_id,
track_name=track.get("name") if track else None,
artists=track.get("artists") if track else None,
preview_url=track.get("preview_url") if track else None,
external_url=track.get("external_url") if track else None,
spotify_playlist_id=None,
playlist_name=None,
playlist_owner=None,
playlist_image_url=None,
playlist_external_url=None,
)
@app.get("/health")
def health() -> dict:
return {"status": "ok"}
@app.get("/spotify/tracks/{track_id}")
def spotify_track(track_id: str, market: str | None = None) -> dict:
"""
Demo endpoint: fetches track info using Spotify Web API with
Authorization: Bearer <token> (app-only Client Credentials token).
"""
try:
return get_track_via_bearer(track_id, market=market)
except SpotifyAPIError as exc:
headers = {}
if exc.retry_after is not None:
headers["Retry-After"] = str(exc.retry_after)
raise HTTPException(status_code=exc.status_code, detail=exc.to_dict(), headers=headers) from exc
except Exception as exc:
raise HTTPException(status_code=500, detail="Unexpected server error") from exc
@app.get("/spotify/genres")
def spotify_genres(authorization: str | None = Header(default=None)) -> dict:
"""
Get available genre seed values for Spotify Recommendations API.
Accepts optional Authorization header:
Authorization: Bearer <access_token>
If not provided, uses app-only token (Client Credentials).
"""
token = None
if authorization:
try:
token = _extract_bearer_token(authorization)
except HTTPException:
pass
try:
genres = get_available_genre_seeds(access_token=token)
return {"genres": genres}
except SpotifyAPIError as exc:
headers = {}
if exc.retry_after is not None:
headers["Retry-After"] = str(exc.retry_after)
raise HTTPException(status_code=exc.status_code, detail=exc.to_dict(), headers=headers) from exc
except Exception as exc:
raise HTTPException(status_code=500, detail="Unexpected server error") from exc
@app.get("/spotify/artists/{artist_name}/tracks")
def spotify_artist_tracks(
artist_name: str,
limit: int = 50,
market: str | None = None,
) -> dict:
token = get_app_access_token()
mk = market or "NG"
try:
items = search_tracks_by_artist(token, artist_name=artist_name, limit=limit, market=mk)
tracks = []
for t in items:
tracks.append(
{
"id": t.get("id"),
"name": t.get("name"),
"uri": t.get("uri"),
"artists": [a.get("name") for a in t.get("artists", [])],
"preview_url": t.get("preview_url"),
"external_url": (t.get("external_urls") or {}).get("spotify"),
"album": (t.get("album") or {}).get("name"),
}
)
return {"artist": artist_name, "market": mk, "tracks": tracks}
except SpotifyAPIError as exc:
headers = {}
if exc.retry_after is not None:
headers["Retry-After"] = str(exc.retry_after)
raise HTTPException(status_code=exc.status_code, detail=exc.to_dict(), headers=headers) from exc
except Exception as exc:
raise HTTPException(status_code=500, detail="Unexpected server error") from exc
@app.get("/spotify/me")
def spotify_me(authorization: str | None = Header(default=None)) -> dict:
"""
Calls Spotify GET /v1/me.
IMPORTANT: This requires a USER access token (Authorization Code flow),
NOT a Client Credentials token.
Send header:
Authorization: Bearer <user_access_token>
"""
token = _extract_bearer_token(authorization)
try:
return get_current_user_profile(token)
except SpotifyAPIError as exc:
headers = {}
if exc.retry_after is not None:
headers["Retry-After"] = str(exc.retry_after)
raise HTTPException(status_code=exc.status_code, detail=exc.to_dict(), headers=headers) from exc
except Exception as exc:
raise HTTPException(status_code=500, detail="Unexpected server error") from exc
@app.get("/spotify/player/state")
def spotify_player_state(
authorization: str | None = Header(default=None),
market: str | None = None,
) -> dict:
token = _extract_bearer_token(authorization)
try:
return get_playback_state(token, market=market)
except SpotifyAPIError as exc:
headers = {}
if exc.retry_after is not None:
headers["Retry-After"] = str(exc.retry_after)
raise HTTPException(status_code=exc.status_code, detail=exc.to_dict(), headers=headers) from exc
except Exception as exc:
raise HTTPException(status_code=500, detail="Unexpected server error") from exc
@app.get("/spotify/player/devices")
def spotify_player_devices(authorization: str | None = Header(default=None)) -> dict:
token = _extract_bearer_token(authorization)
try:
return get_available_devices(token)
except SpotifyAPIError as exc:
headers = {}
if exc.retry_after is not None:
headers["Retry-After"] = str(exc.retry_after)
raise HTTPException(status_code=exc.status_code, detail=exc.to_dict(), headers=headers) from exc
except Exception as exc:
raise HTTPException(status_code=500, detail="Unexpected server error") from exc
@app.get("/spotify/player/currently-playing")
def spotify_player_currently_playing(
authorization: str | None = Header(default=None),
market: str | None = None,
) -> dict:
token = _extract_bearer_token(authorization)
try:
return get_currently_playing(token, market=market)
except SpotifyAPIError as exc:
headers = {}
if exc.retry_after is not None:
headers["Retry-After"] = str(exc.retry_after)
raise HTTPException(status_code=exc.status_code, detail=exc.to_dict(), headers=headers) from exc
except Exception as exc:
raise HTTPException(status_code=500, detail="Unexpected server error") from exc
class PlayerPlayRequest(BaseModel):
device_id: str | None = None
context_uri: str | None = None
uris: list[str] | None = None
position_ms: int | None = None
@app.put("/spotify/player/play")
def spotify_player_play(
body: PlayerPlayRequest,
authorization: str | None = Header(default=None),
) -> dict:
token = _extract_bearer_token(authorization)
try:
return start_or_resume_playback(
token,
device_id=body.device_id,
context_uri=body.context_uri,
uris=body.uris,
position_ms=body.position_ms,
)
except SpotifyAPIError as exc:
headers = {}
if exc.retry_after is not None:
headers["Retry-After"] = str(exc.retry_after)
raise HTTPException(status_code=exc.status_code, detail=exc.to_dict(), headers=headers) from exc
except Exception as exc:
raise HTTPException(status_code=500, detail="Unexpected server error") from exc
@app.put("/spotify/player/pause")
def spotify_player_pause(
authorization: str | None = Header(default=None),
device_id: str | None = None,
) -> dict:
token = _extract_bearer_token(authorization)
try:
return pause_playback(token, device_id=device_id)
except SpotifyAPIError as exc:
headers = {}
if exc.retry_after is not None:
headers["Retry-After"] = str(exc.retry_after)
raise HTTPException(status_code=exc.status_code, detail=exc.to_dict(), headers=headers) from exc
except Exception as exc:
raise HTTPException(status_code=500, detail="Unexpected server error") from exc
@app.post("/spotify/player/next")
def spotify_player_next(
authorization: str | None = Header(default=None),
device_id: str | None = None,
) -> dict:
token = _extract_bearer_token(authorization)
try:
return skip_next(token, device_id=device_id)
except SpotifyAPIError as exc:
headers = {}
if exc.retry_after is not None:
headers["Retry-After"] = str(exc.retry_after)
raise HTTPException(status_code=exc.status_code, detail=exc.to_dict(), headers=headers) from exc
except Exception as exc:
raise HTTPException(status_code=500, detail="Unexpected server error") from exc
@app.post("/spotify/player/previous")
def spotify_player_previous(
authorization: str | None = Header(default=None),
device_id: str | None = None,
) -> dict:
token = _extract_bearer_token(authorization)
try:
return skip_previous(token, device_id=device_id)
except SpotifyAPIError as exc:
headers = {}
if exc.retry_after is not None:
headers["Retry-After"] = str(exc.retry_after)
raise HTTPException(status_code=exc.status_code, detail=exc.to_dict(), headers=headers) from exc
except Exception as exc:
raise HTTPException(status_code=500, detail="Unexpected server error") from exc
class PlayerTransferRequest(BaseModel):
device_ids: list[str]
play: bool | None = None
@app.put("/spotify/player/transfer")
def spotify_player_transfer(
body: PlayerTransferRequest,
authorization: str | None = Header(default=None),
) -> dict:
token = _extract_bearer_token(authorization)
try:
return transfer_playback(token, device_ids=body.device_ids, play=body.play)
except SpotifyAPIError as exc:
headers = {}
if exc.retry_after is not None:
headers["Retry-After"] = str(exc.retry_after)
raise HTTPException(status_code=exc.status_code, detail=exc.to_dict(), headers=headers) from exc
except Exception as exc:
raise HTTPException(status_code=500, detail="Unexpected server error") from exc
class PlayerQueueRequest(BaseModel):
uri: str
device_id: str | None = None
@app.post("/spotify/player/queue")
def spotify_player_queue(
body: PlayerQueueRequest,
authorization: str | None = Header(default=None),
) -> dict:
token = _extract_bearer_token(authorization)
try:
return add_to_queue(token, uri=body.uri, device_id=body.device_id)
except SpotifyAPIError as exc:
headers = {}
if exc.retry_after is not None:
headers["Retry-After"] = str(exc.retry_after)
raise HTTPException(status_code=exc.status_code, detail=exc.to_dict(), headers=headers) from exc
except Exception as exc:
raise HTTPException(status_code=500, detail="Unexpected server error") from exc
@app.post("/mobile/spotify/token")
def spotify_mobile_token(body: SpotifyMobileTokenRequest) -> dict:
try:
return exchange_code_for_token_pkce(
code=body.code,
redirect_uri=body.redirect_uri,
code_verifier=body.code_verifier,
)
except SpotifyAPIError as exc:
headers = {}
if exc.retry_after is not None:
headers["Retry-After"] = str(exc.retry_after)
raise HTTPException(status_code=exc.status_code, detail=exc.to_dict(), headers=headers) from exc
except Exception as exc:
raise HTTPException(status_code=500, detail="Unexpected server error") from exc
@app.post("/mobile/spotify/refresh")
def spotify_mobile_refresh(body: SpotifyMobileRefreshRequest) -> dict:
try:
return refresh_access_token(refresh_token=body.refresh_token)
except SpotifyAPIError as exc:
headers = {}
if exc.retry_after is not None:
headers["Retry-After"] = str(exc.retry_after)
raise HTTPException(status_code=exc.status_code, detail=exc.to_dict(), headers=headers) from exc
except Exception as exc:
raise HTTPException(status_code=500, detail="Unexpected server error") from exc
@app.get("/spotify/artists/id/{artist_id}")
def spotify_get_artist(
artist_id: str,
) -> dict:
token = get_app_access_token()
try:
a = get_artist(token, artist_id=artist_id)
images = a.get("images") or []
image_url = images[0].get("url") if images else None
return {
"id": a.get("id"),
"name": a.get("name"),
"uri": a.get("uri"),
"external_url": (a.get("external_urls") or {}).get("spotify"),
"image_url": image_url,
"followers": (a.get("followers") or {}).get("total"),
"genres": a.get("genres") or [],
"popularity": a.get("popularity"),
}
except SpotifyAPIError as exc:
headers = {}
if exc.retry_after is not None:
headers["Retry-After"] = str(exc.retry_after)
raise HTTPException(status_code=exc.status_code, detail=exc.to_dict(), headers=headers) from exc
except Exception as exc:
raise HTTPException(status_code=500, detail="Unexpected server error") from exc
@app.get("/spotify/artists/id/{artist_id}/top-tracks")
def spotify_artist_top_tracks(
artist_id: str,
market: str | None = None,
) -> dict:
token = get_app_access_token()
mk = market or "NG"
try:
items = get_artist_top_tracks(token, artist_id=artist_id, market=mk)
except SpotifyAPIError as exc:
if exc.status_code not in (403, 404):
headers = {}
if exc.retry_after is not None:
headers["Retry-After"] = str(exc.retry_after)
raise HTTPException(status_code=exc.status_code, detail=exc.to_dict(), headers=headers) from exc
try:
a = get_artist(token, artist_id=artist_id)
artist_name = a.get("name") or artist_id
except Exception:
artist_name = artist_id
items = search_tracks_by_artist(token, artist_name=artist_name, limit=50, market=mk)
tracks = []
for t in items:
album = t.get("album") or {}
images = album.get("images") or []
image_url = images[0].get("url") if images else None
tracks.append(
{
"id": t.get("id"),
"name": t.get("name"),
"uri": t.get("uri"),
"artists": [a.get("name") for a in t.get("artists", [])],
"preview_url": t.get("preview_url"),
"external_url": (t.get("external_urls") or {}).get("spotify"),
"album": album.get("name"),
"image_url": image_url,
}
)
return {"artist_id": artist_id, "market": mk, "tracks": tracks}
except Exception as exc:
raise HTTPException(status_code=500, detail="Unexpected server error") from exc
@app.get("/spotify/artists/id/{artist_id}/albums")
def spotify_artist_albums(
artist_id: str,
market: str | None = None,
include_groups: str = "album,single",
) -> dict:
token = get_app_access_token()
mk = market or "NG"
try:
items = get_artist_albums(
token,
artist_id=artist_id,
market=mk,
include_groups=include_groups,
)
albums = []
for a in items:
images = a.get("images") or []
image_url = images[0].get("url") if images else None
albums.append(
{
"id": a.get("id"),
"name": a.get("name"),
"uri": a.get("uri"),
"external_url": (a.get("external_urls") or {}).get("spotify"),
"release_date": a.get("release_date"),
"total_tracks": a.get("total_tracks"),
"album_type": a.get("album_type"),
"image_url": image_url,
}
)
return {"artist_id": artist_id, "market": mk, "albums": albums}
except SpotifyAPIError as exc:
headers = {}
if exc.retry_after is not None:
headers["Retry-After"] = str(exc.retry_after)
raise HTTPException(status_code=exc.status_code, detail=exc.to_dict(), headers=headers) from exc
except Exception as exc:
raise HTTPException(status_code=500, detail="Unexpected server error") from exc
@app.get("/spotify/albums/{album_id}/tracks")
def spotify_album_tracks(album_id: str, market: str | None = None) -> dict:
token = get_app_access_token()
mk = market or "NG"
try:
items = get_album_tracks(token, album_id=album_id, market=mk)
tracks = []
for t in items:
tracks.append(
{
"id": t.get("id"),
"name": t.get("name"),
"uri": t.get("uri"),
"artists": [a.get("name") for a in t.get("artists", [])],
"preview_url": t.get("preview_url"),
"external_url": (t.get("external_urls") or {}).get("spotify"),
"track_number": t.get("track_number"),
}
)
return {"album_id": album_id, "market": mk, "tracks": tracks}
except SpotifyAPIError as exc:
headers = {}
if exc.retry_after is not None:
headers["Retry-After"] = str(exc.retry_after)
raise HTTPException(status_code=exc.status_code, detail=exc.to_dict(), headers=headers) from exc
except Exception as exc:
raise HTTPException(status_code=500, detail="Unexpected server error") from exc
@app.get("/spotify/artists/id/{artist_id}/all-tracks")
def spotify_artist_all_tracks(artist_id: str, market: str | None = None) -> dict:
token = get_app_access_token()
mk = market or "NG"
try:
tracks = get_all_artist_tracks_via_albums(token, artist_id=artist_id, market=mk)
return {"artist_id": artist_id, "market": mk, "tracks": tracks}
except SpotifyAPIError as exc:
headers = {}
if exc.retry_after is not None:
headers["Retry-After"] = str(exc.retry_after)
raise HTTPException(status_code=exc.status_code, detail=exc.to_dict(), headers=headers) from exc
except Exception as exc:
raise HTTPException(status_code=500, detail="Unexpected server error") from exc
@app.get("/spotify/playlists")
def spotify_my_playlists(
authorization: str | None = Header(default=None),
limit: int = 20,
offset: int = 0,
) -> dict:
"""
List current user's playlists: GET /v1/me/playlists
Scopes:
- playlist-read-private (to include private playlists)
- playlist-read-collaborative (to include collaborative playlists)
"""
token = _extract_bearer_token(authorization)
try:
return list_current_user_playlists(token, limit=limit, offset=offset)
except SpotifyAPIError as exc:
headers = {}
if exc.retry_after is not None:
headers["Retry-After"] = str(exc.retry_after)
raise HTTPException(status_code=exc.status_code, detail=exc.to_dict(), headers=headers) from exc
@app.get("/spotify/playlists/{playlist_id}")
def spotify_get_playlist(
playlist_id: str,
authorization: str | None = Header(default=None),
) -> dict:
token = _extract_bearer_token(authorization)
try:
return get_playlist(token, playlist_id)
except SpotifyAPIError as exc:
headers = {}
if exc.retry_after is not None:
headers["Retry-After"] = str(exc.retry_after)
raise HTTPException(status_code=exc.status_code, detail=exc.to_dict(), headers=headers) from exc
@app.get("/spotify/playlists/{playlist_id}/items")
def spotify_get_playlist_items(
playlist_id: str,
authorization: str | None = Header(default=None),
limit: int = 50,
offset: int = 0,
additional_types: str = "track,episode",
market: str | None = None,
) -> dict:
token = _extract_bearer_token(authorization)
try:
return get_playlist_items(
token,
playlist_id,
limit=limit,
offset=offset,
additional_types=additional_types,
market=market,
)
except SpotifyAPIError as exc:
headers = {}
if exc.retry_after is not None:
headers["Retry-After"] = str(exc.retry_after)
raise HTTPException(status_code=exc.status_code, detail=exc.to_dict(), headers=headers) from exc
class CreatePlaylistRequest(BaseModel):
user_id: str
name: str
description: str | None = None
public: bool = True
collaborative: bool = False
@app.post("/spotify/playlists")
def spotify_create_playlist(
body: CreatePlaylistRequest,
authorization: str | None = Header(default=None),
) -> dict:
"""
Create a playlist for a user: POST /v1/users/{user_id}/playlists
Scopes:
- playlist-modify-public (if public=true)
- playlist-modify-private (if public=false)
"""
token = _extract_bearer_token(authorization)
try:
return create_playlist(
token,
body.user_id,
name=body.name,
description=body.description,
public=body.public,
collaborative=body.collaborative,
)
except ValueError as exc:
raise HTTPException(status_code=400, detail=str(exc)) from exc
except SpotifyAPIError as exc:
headers = {}
if exc.retry_after is not None:
headers["Retry-After"] = str(exc.retry_after)
raise HTTPException(status_code=exc.status_code, detail=exc.to_dict(), headers=headers) from exc
class AddItemsRequest(BaseModel):
uris: list[str]
position: int | None = None
@app.post("/spotify/playlists/{playlist_id}/items")
def spotify_add_items(
playlist_id: str,
body: AddItemsRequest,
authorization: str | None = Header(default=None),
) -> dict:
"""
Add items to a playlist: POST /v1/playlists/{playlist_id}/tracks
Scopes:
- playlist-modify-public or playlist-modify-private (depending on playlist visibility)
"""
token = _extract_bearer_token(authorization)
try:
uris = [to_track_uri(u) for u in body.uris]
return add_items_to_playlist(
token,
playlist_id,
uris=uris,
position=body.position,
)
except SpotifyAPIError as exc:
headers = {}
if exc.retry_after is not None:
headers["Retry-After"] = str(exc.retry_after)
raise HTTPException(status_code=exc.status_code, detail=exc.to_dict(), headers=headers) from exc
except ValueError as exc:
raise HTTPException(status_code=400, detail=str(exc)) from exc
@app.get("/spotify/parse")
def spotify_parse(value: str) -> dict:
"""
Debug helper: parse Spotify URI/URL/ID into {type, id, uri, url}.
"""
ref = parse_spotify_ref(value)
return {
"resource_type": ref.resource_type,
"id": ref.id,
"uri": ref.uri,
"url": ref.url,
}
@app.get("/auth/spotify/login")
def spotify_login(
scope: str = "user-read-email user-read-private",
redirect_uri: str | None = None,
) -> RedirectResponse:
"""
Redirect user to Spotify consent screen (Authorization Code flow).
IMPORTANT:
- The redirect_uri MUST exactly match one of the Redirect URIs configured in
your Spotify Developer Dashboard app settings.
- If redirect_uri is omitted, we use SPOTIFY_REDIRECT_URI from env.
"""
ru = redirect_uri or default_redirect_uri()
if not ru:
raise HTTPException(
status_code=500,
detail="Missing redirect URI. Set SPOTIFY_REDIRECT_URI or pass ?redirect_uri=",
)
try:
validate_redirect_uri(ru)
except ValueError as exc:
raise HTTPException(status_code=400, detail=str(exc)) from exc
state = generate_state()
url = build_authorize_url(redirect_uri=ru, scope=scope, state=state)
return RedirectResponse(url=url, status_code=302)
@app.get("/auth/spotify/callback")
def spotify_callback(
code: str | None = None,
state: str | None = None,
error: str | None = None,
redirect_uri: str | None = None,
) -> dict:
"""
Spotify redirects here after login. Exchange `code` for access token.
For production:
- Validate `state` (CSRF protection) using a server-side session store.
- Store refresh token securely (e.g., Postgres) for long-lived sessions.
"""
if error:
raise HTTPException(status_code=400, detail={"error": error, "state": state})
if not code:
raise HTTPException(status_code=400, detail="Missing `code` query parameter")
ru = redirect_uri or default_redirect_uri()
if not ru:
raise HTTPException(
status_code=500,
detail="Missing redirect URI. Set SPOTIFY_REDIRECT_URI or pass ?redirect_uri=",
)
try:
validate_redirect_uri(ru)
except ValueError as exc:
raise HTTPException(status_code=400, detail=str(exc)) from exc
try:
token = exchange_code_for_token(code=code, redirect_uri=ru)
return token
except SpotifyAPIError as exc:
raise HTTPException(status_code=exc.status_code, detail=exc.to_dict()) from exc