| from io import BytesIO |
|
|
| from fastapi import Depends, FastAPI, File, HTTPException, UploadFile, Header |
| from fastapi.responses import RedirectResponse |
| from fastapi.middleware.cors import CORSMiddleware |
| from pydantic import BaseModel |
| from PIL import Image |
| from sqlalchemy.orm import Session |
| from sqlalchemy.exc import SQLAlchemyError |
|
|
| from .db import Base, engine, get_db |
| from .models import MoodLog, RecommendationLog |
| from .mood_text_model import analyze_text_emotion |
| from .mood_face_model import analyze_face_emotion |
| from .spotify_client import recommend_track_for_emotion, get_track_via_bearer, get_available_genre_seeds, get_app_access_token |
| from .spotify_user_api import get_current_user_profile |
| from .spotify_user_reco import recommend_user_track_for_emotion |
| from .spotify_http import SpotifyAPIError |
| from .spotify_oauth import ( |
| build_authorize_url, |
| default_redirect_uri, |
| exchange_code_for_token, |
| exchange_code_for_token_pkce, |
| generate_state, |
| refresh_access_token, |
| validate_redirect_uri, |
| ) |
| from .spotify_playlists_api import ( |
| add_items_to_playlist, |
| create_playlist, |
| get_playlist, |
| get_playlist_items, |
| list_current_user_playlists, |
| ) |
| from .spotify_ids import parse_spotify_ref, to_track_uri |
| from .spotify_catalog_api import ( |
| get_artist, |
| get_album_tracks, |
| get_all_artist_tracks_via_albums, |
| get_artist_albums, |
| search_tracks_by_artist, |
| get_artist_top_tracks, |
| ) |
| from .spotify_playlist_reco import recommend_playlist_for_emotion, pick_random_track_from_playlist |
| from .spotify_player_api import ( |
| add_to_queue, |
| get_available_devices, |
| get_currently_playing, |
| get_playback_state, |
| pause_playback, |
| skip_next, |
| skip_previous, |
| start_or_resume_playback, |
| transfer_playback, |
| ) |
| Base.metadata.create_all(bind=engine) |
|
|
| app = FastAPI(title="VibeCheck – Mood-based Spotify Recommender") |
|
|
| app.add_middleware( |
| CORSMiddleware, |
| allow_origins=["*"], |
| allow_credentials=True, |
| allow_methods=["*"], |
| allow_headers=["*"], |
| ) |
|
|
|
|
| class TextMoodRequest(BaseModel): |
| text: str |
|
|
|
|
| class RecommendationResponse(BaseModel): |
| mood_label: str |
| mood_score: float |
| spotify_track_id: str | None |
| track_name: str | None |
| artists: list[str] | None |
| preview_url: str | None |
| external_url: str | None |
| spotify_playlist_id: str | None = None |
| playlist_name: str | None = None |
| playlist_owner: str | None = None |
| playlist_image_url: str | None = None |
| playlist_external_url: str | None = None |
|
|
|
|
| class SpotifyMobileTokenRequest(BaseModel): |
| code: str |
| code_verifier: str |
| redirect_uri: str |
|
|
|
|
| class SpotifyMobileRefreshRequest(BaseModel): |
| refresh_token: str |
|
|
|
|
| def _extract_bearer_token(authorization: str | None) -> str: |
| if not authorization: |
| raise HTTPException( |
| status_code=401, |
| detail="Missing Authorization header. Use: Authorization: Bearer <access_token>", |
| ) |
| if not authorization.lower().startswith("bearer "): |
| raise HTTPException( |
| status_code=401, |
| detail="Invalid Authorization header format. Use: Bearer <access_token>", |
| ) |
| return authorization.split(" ", 1)[1].strip() |
|
|
|
|
| @app.post("/mood/text", response_model=RecommendationResponse) |
| def mood_from_text( |
| body: TextMoodRequest, db: Session = Depends(get_db) |
| ) -> RecommendationResponse: |
| if not body.text.strip(): |
| raise HTTPException(status_code=400, detail="Text cannot be empty") |
|
|
| label, score = analyze_text_emotion(body.text) |
| try: |
| playlist = recommend_playlist_for_emotion(label) |
| except SpotifyAPIError: |
| playlist = {} |
|
|
| track: dict = {} |
| track_id: str | None = None |
| try: |
| if playlist: |
| track = pick_random_track_from_playlist( |
| playlist_id=playlist.get("id"), |
| ) |
| track_id = track.get("id") if track else None |
| except SpotifyAPIError: |
| track = {} |
| track_id = None |
|
|
| if not track_id: |
| track = recommend_track_for_emotion(label, source="text") or {} |
| track_id = track.get("id") if track else None |
|
|
| playlist_id = playlist.get("id") if playlist else None |
|
|
| try: |
| mood_log = MoodLog( |
| source="text", |
| raw_input=body.text[:512], |
| emotion_label=label, |
| emotion_score=score, |
| ) |
| db.add(mood_log) |
| db.flush() |
|
|
| if track_id: |
| rec_log = RecommendationLog( |
| user_id=None, |
| mood_id=mood_log.id, |
| spotify_track_id=track_id, |
| ) |
| db.add(rec_log) |
|
|
| db.commit() |
| except SQLAlchemyError: |
| db.rollback() |
|
|
| return RecommendationResponse( |
| mood_label=label, |
| mood_score=score, |
| spotify_track_id=track_id, |
| track_name=track.get("name") if track else None, |
| artists=track.get("artists") if track else None, |
| preview_url=track.get("preview_url") if track else None, |
| external_url=track.get("external_url") if track else None, |
| spotify_playlist_id=playlist_id, |
| playlist_name=playlist.get("name") if playlist else None, |
| playlist_owner=playlist.get("owner") if playlist else None, |
| playlist_image_url=playlist.get("image_url") if playlist else None, |
| playlist_external_url=playlist.get("external_url") if playlist else None, |
| ) |
|
|
|
|
| @app.post("/mood/face", response_model=RecommendationResponse) |
| async def mood_from_face( |
| file: UploadFile = File(...), db: Session = Depends(get_db) |
| ) -> RecommendationResponse: |
| contents = await file.read() |
| try: |
| image = Image.open(BytesIO(contents)).convert("RGB") |
| except Exception as exc: |
| raise HTTPException(status_code=400, detail="Invalid image file") from exc |
|
|
| label, score = analyze_face_emotion(image) |
| try: |
| playlist = recommend_playlist_for_emotion(label) |
| except SpotifyAPIError: |
| playlist = {} |
|
|
| track: dict = {} |
| track_id: str | None = None |
| try: |
| if playlist: |
| track = pick_random_track_from_playlist( |
| playlist_id=playlist.get("id"), |
| ) |
| track_id = track.get("id") if track else None |
| except SpotifyAPIError: |
| track = {} |
| track_id = None |
|
|
| if not track_id: |
| track = recommend_track_for_emotion(label, source="face") or {} |
| track_id = track.get("id") if track else None |
|
|
| playlist_id = playlist.get("id") if playlist else None |
|
|
| try: |
| mood_log = MoodLog( |
| source="face", |
| raw_input=None, |
| emotion_label=label, |
| emotion_score=score, |
| ) |
| db.add(mood_log) |
| db.flush() |
|
|
| if track_id: |
| rec_log = RecommendationLog( |
| user_id=None, |
| mood_id=mood_log.id, |
| spotify_track_id=track_id, |
| ) |
| db.add(rec_log) |
|
|
| db.commit() |
| except SQLAlchemyError: |
| db.rollback() |
|
|
| return RecommendationResponse( |
| mood_label=label, |
| mood_score=score, |
| spotify_track_id=track_id, |
| track_name=track.get("name") if track else None, |
| artists=track.get("artists") if track else None, |
| preview_url=track.get("preview_url") if track else None, |
| external_url=track.get("external_url") if track else None, |
| spotify_playlist_id=playlist_id, |
| playlist_name=playlist.get("name") if playlist else None, |
| playlist_owner=playlist.get("owner") if playlist else None, |
| playlist_image_url=playlist.get("image_url") if playlist else None, |
| playlist_external_url=playlist.get("external_url") if playlist else None, |
| ) |
|
|
|
|
| @app.post("/mood/text/user", response_model=RecommendationResponse) |
| def mood_from_text_user( |
| body: TextMoodRequest, |
| authorization: str | None = Header(default=None), |
| db: Session = Depends(get_db), |
| ) -> RecommendationResponse: |
| if not body.text.strip(): |
| raise HTTPException(status_code=400, detail="Text cannot be empty") |
|
|
| token = _extract_bearer_token(authorization) |
|
|
| label, score = analyze_text_emotion(body.text) |
|
|
| try: |
| track = recommend_user_track_for_emotion(token, label) |
| except SpotifyAPIError: |
| track = recommend_track_for_emotion(label, source="user") or {} |
|
|
| track_id: str | None = track.get("id") if track else None |
|
|
| try: |
| mood_log = MoodLog( |
| source="text_user", |
| raw_input=body.text[:512], |
| emotion_label=label, |
| emotion_score=score, |
| ) |
| db.add(mood_log) |
| db.flush() |
|
|
| if track_id: |
| rec_log = RecommendationLog( |
| user_id=None, |
| mood_id=mood_log.id, |
| spotify_track_id=track_id, |
| ) |
| db.add(rec_log) |
|
|
| db.commit() |
| except SQLAlchemyError: |
| db.rollback() |
|
|
| return RecommendationResponse( |
| mood_label=label, |
| mood_score=score, |
| spotify_track_id=track_id, |
| track_name=track.get("name") if track else None, |
| artists=track.get("artists") if track else None, |
| preview_url=track.get("preview_url") if track else None, |
| external_url=track.get("external_url") if track else None, |
| spotify_playlist_id=None, |
| playlist_name=None, |
| playlist_owner=None, |
| playlist_image_url=None, |
| playlist_external_url=None, |
| ) |
|
|
|
|
| @app.post("/mood/face/user", response_model=RecommendationResponse) |
| async def mood_from_face_user( |
| authorization: str | None = Header(default=None), |
| file: UploadFile = File(...), |
| db: Session = Depends(get_db), |
| ) -> RecommendationResponse: |
| token = _extract_bearer_token(authorization) |
|
|
| contents = await file.read() |
| try: |
| image = Image.open(BytesIO(contents)).convert("RGB") |
| except Exception as exc: |
| raise HTTPException(status_code=400, detail="Invalid image file") from exc |
|
|
| label, score = analyze_face_emotion(image) |
|
|
| try: |
| track = recommend_user_track_for_emotion(token, label) |
| except SpotifyAPIError: |
| track = recommend_track_for_emotion(label, source="user") or {} |
|
|
| track_id: str | None = track.get("id") if track else None |
|
|
| try: |
| mood_log = MoodLog( |
| source="face_user", |
| raw_input=None, |
| emotion_label=label, |
| emotion_score=score, |
| ) |
| db.add(mood_log) |
| db.flush() |
|
|
| if track_id: |
| rec_log = RecommendationLog( |
| user_id=None, |
| mood_id=mood_log.id, |
| spotify_track_id=track_id, |
| ) |
| db.add(rec_log) |
|
|
| db.commit() |
| except SQLAlchemyError: |
| db.rollback() |
|
|
| return RecommendationResponse( |
| mood_label=label, |
| mood_score=score, |
| spotify_track_id=track_id, |
| track_name=track.get("name") if track else None, |
| artists=track.get("artists") if track else None, |
| preview_url=track.get("preview_url") if track else None, |
| external_url=track.get("external_url") if track else None, |
| spotify_playlist_id=None, |
| playlist_name=None, |
| playlist_owner=None, |
| playlist_image_url=None, |
| playlist_external_url=None, |
| ) |
|
|
|
|
| @app.get("/health") |
| def health() -> dict: |
| return {"status": "ok"} |
|
|
|
|
| @app.get("/spotify/tracks/{track_id}") |
| def spotify_track(track_id: str, market: str | None = None) -> dict: |
| """ |
| Demo endpoint: fetches track info using Spotify Web API with |
| Authorization: Bearer <token> (app-only Client Credentials token). |
| """ |
| try: |
| return get_track_via_bearer(track_id, market=market) |
| except SpotifyAPIError as exc: |
| headers = {} |
| if exc.retry_after is not None: |
| headers["Retry-After"] = str(exc.retry_after) |
| raise HTTPException(status_code=exc.status_code, detail=exc.to_dict(), headers=headers) from exc |
| except Exception as exc: |
| raise HTTPException(status_code=500, detail="Unexpected server error") from exc |
|
|
|
|
| @app.get("/spotify/genres") |
| def spotify_genres(authorization: str | None = Header(default=None)) -> dict: |
| """ |
| Get available genre seed values for Spotify Recommendations API. |
| |
| Accepts optional Authorization header: |
| Authorization: Bearer <access_token> |
| |
| If not provided, uses app-only token (Client Credentials). |
| """ |
| token = None |
| if authorization: |
| try: |
| token = _extract_bearer_token(authorization) |
| except HTTPException: |
| pass |
| try: |
| genres = get_available_genre_seeds(access_token=token) |
| return {"genres": genres} |
| except SpotifyAPIError as exc: |
| headers = {} |
| if exc.retry_after is not None: |
| headers["Retry-After"] = str(exc.retry_after) |
| raise HTTPException(status_code=exc.status_code, detail=exc.to_dict(), headers=headers) from exc |
| except Exception as exc: |
| raise HTTPException(status_code=500, detail="Unexpected server error") from exc |
|
|
|
|
| @app.get("/spotify/artists/{artist_name}/tracks") |
| def spotify_artist_tracks( |
| artist_name: str, |
| limit: int = 50, |
| market: str | None = None, |
| ) -> dict: |
| token = get_app_access_token() |
| mk = market or "NG" |
| try: |
| items = search_tracks_by_artist(token, artist_name=artist_name, limit=limit, market=mk) |
| tracks = [] |
| for t in items: |
| tracks.append( |
| { |
| "id": t.get("id"), |
| "name": t.get("name"), |
| "uri": t.get("uri"), |
| "artists": [a.get("name") for a in t.get("artists", [])], |
| "preview_url": t.get("preview_url"), |
| "external_url": (t.get("external_urls") or {}).get("spotify"), |
| "album": (t.get("album") or {}).get("name"), |
| } |
| ) |
| return {"artist": artist_name, "market": mk, "tracks": tracks} |
| except SpotifyAPIError as exc: |
| headers = {} |
| if exc.retry_after is not None: |
| headers["Retry-After"] = str(exc.retry_after) |
| raise HTTPException(status_code=exc.status_code, detail=exc.to_dict(), headers=headers) from exc |
| except Exception as exc: |
| raise HTTPException(status_code=500, detail="Unexpected server error") from exc |
|
|
|
|
| @app.get("/spotify/me") |
| def spotify_me(authorization: str | None = Header(default=None)) -> dict: |
| """ |
| Calls Spotify GET /v1/me. |
| |
| IMPORTANT: This requires a USER access token (Authorization Code flow), |
| NOT a Client Credentials token. |
| |
| Send header: |
| Authorization: Bearer <user_access_token> |
| """ |
| token = _extract_bearer_token(authorization) |
| try: |
| return get_current_user_profile(token) |
| except SpotifyAPIError as exc: |
| headers = {} |
| if exc.retry_after is not None: |
| headers["Retry-After"] = str(exc.retry_after) |
| raise HTTPException(status_code=exc.status_code, detail=exc.to_dict(), headers=headers) from exc |
| except Exception as exc: |
| raise HTTPException(status_code=500, detail="Unexpected server error") from exc |
|
|
|
|
| @app.get("/spotify/player/state") |
| def spotify_player_state( |
| authorization: str | None = Header(default=None), |
| market: str | None = None, |
| ) -> dict: |
| token = _extract_bearer_token(authorization) |
| try: |
| return get_playback_state(token, market=market) |
| except SpotifyAPIError as exc: |
| headers = {} |
| if exc.retry_after is not None: |
| headers["Retry-After"] = str(exc.retry_after) |
| raise HTTPException(status_code=exc.status_code, detail=exc.to_dict(), headers=headers) from exc |
| except Exception as exc: |
| raise HTTPException(status_code=500, detail="Unexpected server error") from exc |
|
|
|
|
| @app.get("/spotify/player/devices") |
| def spotify_player_devices(authorization: str | None = Header(default=None)) -> dict: |
| token = _extract_bearer_token(authorization) |
| try: |
| return get_available_devices(token) |
| except SpotifyAPIError as exc: |
| headers = {} |
| if exc.retry_after is not None: |
| headers["Retry-After"] = str(exc.retry_after) |
| raise HTTPException(status_code=exc.status_code, detail=exc.to_dict(), headers=headers) from exc |
| except Exception as exc: |
| raise HTTPException(status_code=500, detail="Unexpected server error") from exc |
|
|
|
|
| @app.get("/spotify/player/currently-playing") |
| def spotify_player_currently_playing( |
| authorization: str | None = Header(default=None), |
| market: str | None = None, |
| ) -> dict: |
| token = _extract_bearer_token(authorization) |
| try: |
| return get_currently_playing(token, market=market) |
| except SpotifyAPIError as exc: |
| headers = {} |
| if exc.retry_after is not None: |
| headers["Retry-After"] = str(exc.retry_after) |
| raise HTTPException(status_code=exc.status_code, detail=exc.to_dict(), headers=headers) from exc |
| except Exception as exc: |
| raise HTTPException(status_code=500, detail="Unexpected server error") from exc |
|
|
|
|
| class PlayerPlayRequest(BaseModel): |
| device_id: str | None = None |
| context_uri: str | None = None |
| uris: list[str] | None = None |
| position_ms: int | None = None |
|
|
|
|
| @app.put("/spotify/player/play") |
| def spotify_player_play( |
| body: PlayerPlayRequest, |
| authorization: str | None = Header(default=None), |
| ) -> dict: |
| token = _extract_bearer_token(authorization) |
| try: |
| return start_or_resume_playback( |
| token, |
| device_id=body.device_id, |
| context_uri=body.context_uri, |
| uris=body.uris, |
| position_ms=body.position_ms, |
| ) |
| except SpotifyAPIError as exc: |
| headers = {} |
| if exc.retry_after is not None: |
| headers["Retry-After"] = str(exc.retry_after) |
| raise HTTPException(status_code=exc.status_code, detail=exc.to_dict(), headers=headers) from exc |
| except Exception as exc: |
| raise HTTPException(status_code=500, detail="Unexpected server error") from exc |
|
|
|
|
| @app.put("/spotify/player/pause") |
| def spotify_player_pause( |
| authorization: str | None = Header(default=None), |
| device_id: str | None = None, |
| ) -> dict: |
| token = _extract_bearer_token(authorization) |
| try: |
| return pause_playback(token, device_id=device_id) |
| except SpotifyAPIError as exc: |
| headers = {} |
| if exc.retry_after is not None: |
| headers["Retry-After"] = str(exc.retry_after) |
| raise HTTPException(status_code=exc.status_code, detail=exc.to_dict(), headers=headers) from exc |
| except Exception as exc: |
| raise HTTPException(status_code=500, detail="Unexpected server error") from exc |
|
|
|
|
| @app.post("/spotify/player/next") |
| def spotify_player_next( |
| authorization: str | None = Header(default=None), |
| device_id: str | None = None, |
| ) -> dict: |
| token = _extract_bearer_token(authorization) |
| try: |
| return skip_next(token, device_id=device_id) |
| except SpotifyAPIError as exc: |
| headers = {} |
| if exc.retry_after is not None: |
| headers["Retry-After"] = str(exc.retry_after) |
| raise HTTPException(status_code=exc.status_code, detail=exc.to_dict(), headers=headers) from exc |
| except Exception as exc: |
| raise HTTPException(status_code=500, detail="Unexpected server error") from exc |
|
|
|
|
| @app.post("/spotify/player/previous") |
| def spotify_player_previous( |
| authorization: str | None = Header(default=None), |
| device_id: str | None = None, |
| ) -> dict: |
| token = _extract_bearer_token(authorization) |
| try: |
| return skip_previous(token, device_id=device_id) |
| except SpotifyAPIError as exc: |
| headers = {} |
| if exc.retry_after is not None: |
| headers["Retry-After"] = str(exc.retry_after) |
| raise HTTPException(status_code=exc.status_code, detail=exc.to_dict(), headers=headers) from exc |
| except Exception as exc: |
| raise HTTPException(status_code=500, detail="Unexpected server error") from exc |
|
|
|
|
| class PlayerTransferRequest(BaseModel): |
| device_ids: list[str] |
| play: bool | None = None |
|
|
|
|
| @app.put("/spotify/player/transfer") |
| def spotify_player_transfer( |
| body: PlayerTransferRequest, |
| authorization: str | None = Header(default=None), |
| ) -> dict: |
| token = _extract_bearer_token(authorization) |
| try: |
| return transfer_playback(token, device_ids=body.device_ids, play=body.play) |
| except SpotifyAPIError as exc: |
| headers = {} |
| if exc.retry_after is not None: |
| headers["Retry-After"] = str(exc.retry_after) |
| raise HTTPException(status_code=exc.status_code, detail=exc.to_dict(), headers=headers) from exc |
| except Exception as exc: |
| raise HTTPException(status_code=500, detail="Unexpected server error") from exc |
|
|
|
|
| class PlayerQueueRequest(BaseModel): |
| uri: str |
| device_id: str | None = None |
|
|
|
|
| @app.post("/spotify/player/queue") |
| def spotify_player_queue( |
| body: PlayerQueueRequest, |
| authorization: str | None = Header(default=None), |
| ) -> dict: |
| token = _extract_bearer_token(authorization) |
| try: |
| return add_to_queue(token, uri=body.uri, device_id=body.device_id) |
| except SpotifyAPIError as exc: |
| headers = {} |
| if exc.retry_after is not None: |
| headers["Retry-After"] = str(exc.retry_after) |
| raise HTTPException(status_code=exc.status_code, detail=exc.to_dict(), headers=headers) from exc |
| except Exception as exc: |
| raise HTTPException(status_code=500, detail="Unexpected server error") from exc |
|
|
|
|
|
|
| @app.post("/mobile/spotify/token") |
| def spotify_mobile_token(body: SpotifyMobileTokenRequest) -> dict: |
| try: |
| return exchange_code_for_token_pkce( |
| code=body.code, |
| redirect_uri=body.redirect_uri, |
| code_verifier=body.code_verifier, |
| ) |
| except SpotifyAPIError as exc: |
| headers = {} |
| if exc.retry_after is not None: |
| headers["Retry-After"] = str(exc.retry_after) |
| raise HTTPException(status_code=exc.status_code, detail=exc.to_dict(), headers=headers) from exc |
| except Exception as exc: |
| raise HTTPException(status_code=500, detail="Unexpected server error") from exc |
|
|
|
|
| @app.post("/mobile/spotify/refresh") |
| def spotify_mobile_refresh(body: SpotifyMobileRefreshRequest) -> dict: |
| try: |
| return refresh_access_token(refresh_token=body.refresh_token) |
| except SpotifyAPIError as exc: |
| headers = {} |
| if exc.retry_after is not None: |
| headers["Retry-After"] = str(exc.retry_after) |
| raise HTTPException(status_code=exc.status_code, detail=exc.to_dict(), headers=headers) from exc |
| except Exception as exc: |
| raise HTTPException(status_code=500, detail="Unexpected server error") from exc |
|
|
| @app.get("/spotify/artists/id/{artist_id}") |
| def spotify_get_artist( |
| artist_id: str, |
| ) -> dict: |
| token = get_app_access_token() |
| try: |
| a = get_artist(token, artist_id=artist_id) |
| images = a.get("images") or [] |
| image_url = images[0].get("url") if images else None |
| return { |
| "id": a.get("id"), |
| "name": a.get("name"), |
| "uri": a.get("uri"), |
| "external_url": (a.get("external_urls") or {}).get("spotify"), |
| "image_url": image_url, |
| "followers": (a.get("followers") or {}).get("total"), |
| "genres": a.get("genres") or [], |
| "popularity": a.get("popularity"), |
| } |
| except SpotifyAPIError as exc: |
| headers = {} |
| if exc.retry_after is not None: |
| headers["Retry-After"] = str(exc.retry_after) |
| raise HTTPException(status_code=exc.status_code, detail=exc.to_dict(), headers=headers) from exc |
| except Exception as exc: |
| raise HTTPException(status_code=500, detail="Unexpected server error") from exc |
|
|
|
|
| @app.get("/spotify/artists/id/{artist_id}/top-tracks") |
| def spotify_artist_top_tracks( |
| artist_id: str, |
| market: str | None = None, |
| ) -> dict: |
| token = get_app_access_token() |
| mk = market or "NG" |
| try: |
| items = get_artist_top_tracks(token, artist_id=artist_id, market=mk) |
| except SpotifyAPIError as exc: |
| if exc.status_code not in (403, 404): |
| headers = {} |
| if exc.retry_after is not None: |
| headers["Retry-After"] = str(exc.retry_after) |
| raise HTTPException(status_code=exc.status_code, detail=exc.to_dict(), headers=headers) from exc |
|
|
| try: |
| a = get_artist(token, artist_id=artist_id) |
| artist_name = a.get("name") or artist_id |
| except Exception: |
| artist_name = artist_id |
|
|
| items = search_tracks_by_artist(token, artist_name=artist_name, limit=50, market=mk) |
| tracks = [] |
| for t in items: |
| album = t.get("album") or {} |
| images = album.get("images") or [] |
| image_url = images[0].get("url") if images else None |
| tracks.append( |
| { |
| "id": t.get("id"), |
| "name": t.get("name"), |
| "uri": t.get("uri"), |
| "artists": [a.get("name") for a in t.get("artists", [])], |
| "preview_url": t.get("preview_url"), |
| "external_url": (t.get("external_urls") or {}).get("spotify"), |
| "album": album.get("name"), |
| "image_url": image_url, |
| } |
| ) |
| return {"artist_id": artist_id, "market": mk, "tracks": tracks} |
| except Exception as exc: |
| raise HTTPException(status_code=500, detail="Unexpected server error") from exc |
|
|
|
|
| @app.get("/spotify/artists/id/{artist_id}/albums") |
| def spotify_artist_albums( |
| artist_id: str, |
| market: str | None = None, |
| include_groups: str = "album,single", |
| ) -> dict: |
| token = get_app_access_token() |
| mk = market or "NG" |
| try: |
| items = get_artist_albums( |
| token, |
| artist_id=artist_id, |
| market=mk, |
| include_groups=include_groups, |
| ) |
| albums = [] |
| for a in items: |
| images = a.get("images") or [] |
| image_url = images[0].get("url") if images else None |
| albums.append( |
| { |
| "id": a.get("id"), |
| "name": a.get("name"), |
| "uri": a.get("uri"), |
| "external_url": (a.get("external_urls") or {}).get("spotify"), |
| "release_date": a.get("release_date"), |
| "total_tracks": a.get("total_tracks"), |
| "album_type": a.get("album_type"), |
| "image_url": image_url, |
| } |
| ) |
| return {"artist_id": artist_id, "market": mk, "albums": albums} |
| except SpotifyAPIError as exc: |
| headers = {} |
| if exc.retry_after is not None: |
| headers["Retry-After"] = str(exc.retry_after) |
| raise HTTPException(status_code=exc.status_code, detail=exc.to_dict(), headers=headers) from exc |
| except Exception as exc: |
| raise HTTPException(status_code=500, detail="Unexpected server error") from exc |
|
|
|
|
| @app.get("/spotify/albums/{album_id}/tracks") |
| def spotify_album_tracks(album_id: str, market: str | None = None) -> dict: |
| token = get_app_access_token() |
| mk = market or "NG" |
| try: |
| items = get_album_tracks(token, album_id=album_id, market=mk) |
| tracks = [] |
| for t in items: |
| tracks.append( |
| { |
| "id": t.get("id"), |
| "name": t.get("name"), |
| "uri": t.get("uri"), |
| "artists": [a.get("name") for a in t.get("artists", [])], |
| "preview_url": t.get("preview_url"), |
| "external_url": (t.get("external_urls") or {}).get("spotify"), |
| "track_number": t.get("track_number"), |
| } |
| ) |
| return {"album_id": album_id, "market": mk, "tracks": tracks} |
| except SpotifyAPIError as exc: |
| headers = {} |
| if exc.retry_after is not None: |
| headers["Retry-After"] = str(exc.retry_after) |
| raise HTTPException(status_code=exc.status_code, detail=exc.to_dict(), headers=headers) from exc |
| except Exception as exc: |
| raise HTTPException(status_code=500, detail="Unexpected server error") from exc |
|
|
|
|
| @app.get("/spotify/artists/id/{artist_id}/all-tracks") |
| def spotify_artist_all_tracks(artist_id: str, market: str | None = None) -> dict: |
| token = get_app_access_token() |
| mk = market or "NG" |
| try: |
| tracks = get_all_artist_tracks_via_albums(token, artist_id=artist_id, market=mk) |
| return {"artist_id": artist_id, "market": mk, "tracks": tracks} |
| except SpotifyAPIError as exc: |
| headers = {} |
| if exc.retry_after is not None: |
| headers["Retry-After"] = str(exc.retry_after) |
| raise HTTPException(status_code=exc.status_code, detail=exc.to_dict(), headers=headers) from exc |
| except Exception as exc: |
| raise HTTPException(status_code=500, detail="Unexpected server error") from exc |
|
|
|
|
| @app.get("/spotify/playlists") |
| def spotify_my_playlists( |
| authorization: str | None = Header(default=None), |
| limit: int = 20, |
| offset: int = 0, |
| ) -> dict: |
| """ |
| List current user's playlists: GET /v1/me/playlists |
| Scopes: |
| - playlist-read-private (to include private playlists) |
| - playlist-read-collaborative (to include collaborative playlists) |
| """ |
| token = _extract_bearer_token(authorization) |
| try: |
| return list_current_user_playlists(token, limit=limit, offset=offset) |
| except SpotifyAPIError as exc: |
| headers = {} |
| if exc.retry_after is not None: |
| headers["Retry-After"] = str(exc.retry_after) |
| raise HTTPException(status_code=exc.status_code, detail=exc.to_dict(), headers=headers) from exc |
|
|
|
|
| @app.get("/spotify/playlists/{playlist_id}") |
| def spotify_get_playlist( |
| playlist_id: str, |
| authorization: str | None = Header(default=None), |
| ) -> dict: |
| token = _extract_bearer_token(authorization) |
| try: |
| return get_playlist(token, playlist_id) |
| except SpotifyAPIError as exc: |
| headers = {} |
| if exc.retry_after is not None: |
| headers["Retry-After"] = str(exc.retry_after) |
| raise HTTPException(status_code=exc.status_code, detail=exc.to_dict(), headers=headers) from exc |
|
|
|
|
| @app.get("/spotify/playlists/{playlist_id}/items") |
| def spotify_get_playlist_items( |
| playlist_id: str, |
| authorization: str | None = Header(default=None), |
| limit: int = 50, |
| offset: int = 0, |
| additional_types: str = "track,episode", |
| market: str | None = None, |
| ) -> dict: |
| token = _extract_bearer_token(authorization) |
| try: |
| return get_playlist_items( |
| token, |
| playlist_id, |
| limit=limit, |
| offset=offset, |
| additional_types=additional_types, |
| market=market, |
| ) |
| except SpotifyAPIError as exc: |
| headers = {} |
| if exc.retry_after is not None: |
| headers["Retry-After"] = str(exc.retry_after) |
| raise HTTPException(status_code=exc.status_code, detail=exc.to_dict(), headers=headers) from exc |
|
|
|
|
| class CreatePlaylistRequest(BaseModel): |
| user_id: str |
| name: str |
| description: str | None = None |
| public: bool = True |
| collaborative: bool = False |
|
|
|
|
| @app.post("/spotify/playlists") |
| def spotify_create_playlist( |
| body: CreatePlaylistRequest, |
| authorization: str | None = Header(default=None), |
| ) -> dict: |
| """ |
| Create a playlist for a user: POST /v1/users/{user_id}/playlists |
| Scopes: |
| - playlist-modify-public (if public=true) |
| - playlist-modify-private (if public=false) |
| """ |
| token = _extract_bearer_token(authorization) |
| try: |
| return create_playlist( |
| token, |
| body.user_id, |
| name=body.name, |
| description=body.description, |
| public=body.public, |
| collaborative=body.collaborative, |
| ) |
| except ValueError as exc: |
| raise HTTPException(status_code=400, detail=str(exc)) from exc |
| except SpotifyAPIError as exc: |
| headers = {} |
| if exc.retry_after is not None: |
| headers["Retry-After"] = str(exc.retry_after) |
| raise HTTPException(status_code=exc.status_code, detail=exc.to_dict(), headers=headers) from exc |
|
|
|
|
| class AddItemsRequest(BaseModel): |
| uris: list[str] |
| position: int | None = None |
|
|
|
|
| @app.post("/spotify/playlists/{playlist_id}/items") |
| def spotify_add_items( |
| playlist_id: str, |
| body: AddItemsRequest, |
| authorization: str | None = Header(default=None), |
| ) -> dict: |
| """ |
| Add items to a playlist: POST /v1/playlists/{playlist_id}/tracks |
| Scopes: |
| - playlist-modify-public or playlist-modify-private (depending on playlist visibility) |
| """ |
| token = _extract_bearer_token(authorization) |
| try: |
| uris = [to_track_uri(u) for u in body.uris] |
| return add_items_to_playlist( |
| token, |
| playlist_id, |
| uris=uris, |
| position=body.position, |
| ) |
| except SpotifyAPIError as exc: |
| headers = {} |
| if exc.retry_after is not None: |
| headers["Retry-After"] = str(exc.retry_after) |
| raise HTTPException(status_code=exc.status_code, detail=exc.to_dict(), headers=headers) from exc |
| except ValueError as exc: |
| raise HTTPException(status_code=400, detail=str(exc)) from exc |
|
|
|
|
| @app.get("/spotify/parse") |
| def spotify_parse(value: str) -> dict: |
| """ |
| Debug helper: parse Spotify URI/URL/ID into {type, id, uri, url}. |
| """ |
| ref = parse_spotify_ref(value) |
| return { |
| "resource_type": ref.resource_type, |
| "id": ref.id, |
| "uri": ref.uri, |
| "url": ref.url, |
| } |
|
|
|
|
| @app.get("/auth/spotify/login") |
| def spotify_login( |
| scope: str = "user-read-email user-read-private", |
| redirect_uri: str | None = None, |
| ) -> RedirectResponse: |
| """ |
| Redirect user to Spotify consent screen (Authorization Code flow). |
| |
| IMPORTANT: |
| - The redirect_uri MUST exactly match one of the Redirect URIs configured in |
| your Spotify Developer Dashboard app settings. |
| - If redirect_uri is omitted, we use SPOTIFY_REDIRECT_URI from env. |
| """ |
| ru = redirect_uri or default_redirect_uri() |
| if not ru: |
| raise HTTPException( |
| status_code=500, |
| detail="Missing redirect URI. Set SPOTIFY_REDIRECT_URI or pass ?redirect_uri=", |
| ) |
| try: |
| validate_redirect_uri(ru) |
| except ValueError as exc: |
| raise HTTPException(status_code=400, detail=str(exc)) from exc |
| state = generate_state() |
| url = build_authorize_url(redirect_uri=ru, scope=scope, state=state) |
| return RedirectResponse(url=url, status_code=302) |
|
|
|
|
| @app.get("/auth/spotify/callback") |
| def spotify_callback( |
| code: str | None = None, |
| state: str | None = None, |
| error: str | None = None, |
| redirect_uri: str | None = None, |
| ) -> dict: |
| """ |
| Spotify redirects here after login. Exchange `code` for access token. |
| |
| For production: |
| - Validate `state` (CSRF protection) using a server-side session store. |
| - Store refresh token securely (e.g., Postgres) for long-lived sessions. |
| """ |
| if error: |
| raise HTTPException(status_code=400, detail={"error": error, "state": state}) |
| if not code: |
| raise HTTPException(status_code=400, detail="Missing `code` query parameter") |
|
|
| ru = redirect_uri or default_redirect_uri() |
| if not ru: |
| raise HTTPException( |
| status_code=500, |
| detail="Missing redirect URI. Set SPOTIFY_REDIRECT_URI or pass ?redirect_uri=", |
| ) |
| try: |
| validate_redirect_uri(ru) |
| except ValueError as exc: |
| raise HTTPException(status_code=400, detail=str(exc)) from exc |
| try: |
| token = exchange_code_for_token(code=code, redirect_uri=ru) |
| return token |
| except SpotifyAPIError as exc: |
| raise HTTPException(status_code=exc.status_code, detail=exc.to_dict()) from exc |
|
|
|
|
|
|