import os import time import base64 import argparse import re from urllib.parse import urlparse import requests import pandas as pd from tqdm import tqdm from langdetect import detect, DetectorFactory DetectorFactory.seed = 0 # resultados deterministas SPOTIFY_TOKEN_URL = "https://accounts.spotify.com/api/token" SPOTIFY_API_BASE = "https://api.spotify.com/v1" def env(var, default=None, required=False): v = os.getenv(var, default) if required and not v: raise SystemExit(f"Missing required env var: {var}") return v def get_access_token(client_id: str, client_secret: str) -> str: auth = base64.b64encode(f"{client_id}:{client_secret}".encode()).decode() headers = {"Authorization": f"Basic {auth}", "Content-Type": "application/x-www-form-urlencoded"} data = {"grant_type": "client_credentials"} r = requests.post(SPOTIFY_TOKEN_URL, headers=headers, data=data, timeout=30) r.raise_for_status() return r.json()["access_token"] def request_spotify(method: str, path: str, token: str, params=None, max_retries=5): url = path if path.startswith("http") else f"{SPOTIFY_API_BASE}{path}" headers = {"Authorization": f"Bearer {token}"} backoff = 1.0 for attempt in range(max_retries): r = requests.request(method, url, headers=headers, params=params, timeout=30) if r.status_code == 429: retry_after = int(r.headers.get("Retry-After", "1")) time.sleep(retry_after + 0.1) continue if 200 <= r.status_code < 300: return r.json() if r.status_code >= 500: time.sleep(backoff) backoff = min(backoff * 2, 10) continue # errores 4xx (no 429) try: detail = r.json() except Exception: detail = r.text raise RuntimeError(f"Spotify API {r.status_code}: {detail}") raise RuntimeError("Max retries exceeded for Spotify API") def search_playlists(query: str, token: str, limit=20, pages=3): """Busca playlists por texto. Devuelve lista de dicts (id, name, owner).""" results = [] offset = 0 for _ in range(pages): data = request_spotify( "GET", "/search", token, params={"q": query, "type": "playlist", "limit": limit, "offset": offset} ) items = data.get("playlists", {}).get("items", []) for it in items: if it is None: continue results.append({ "id": it["id"], "name": it.get("name", ""), "owner": (it.get("owner") or {}).get("display_name", ""), }) if len(items) < limit: break offset += limit time.sleep(0.1) return results def parse_playlist_id(s: str) -> str: """Acepta ID, URL https://open.spotify.com/playlist/{id} o spotify:playlist:{id}.""" s = s.strip() if re.match(r"^[A-Za-z0-9]{22}$", s): return s if s.startswith("spotify:playlist:"): return s.split(":")[-1] if "open.spotify.com" in s: path = urlparse(s).path parts = [p for p in path.split("/") if p] if len(parts) >= 2 and parts[0] == "playlist": return parts[1] raise ValueError(f"Unrecognized playlist format: {s}") def iter_playlist_tracks(playlist_id: str, token: str, max_pages=None): """Itera todas las pistas de una playlist (paginado).""" fields = "items(track(name,id,artists(name,id),album(name,release_date,release_date_precision))),next" url = f"/playlists/{playlist_id}/tracks" params = {"limit": 100, "fields": fields} total = 0 page = 0 while True: data = request_spotify("GET", url, token, params=params) items = data.get("items", []) for it in items: yield it.get("track") total += len(items) page += 1 next_url = data.get("next") if not next_url: break if max_pages and page >= max_pages: break url = next_url params = None # next ya incluye query time.sleep(0.05) def get_artist_genres(artist_id: str, token: str): data = request_spotify("GET", f"/artists/{artist_id}", token) return data.get("genres", []) or [] def detect_lang(title: str, artist: str) -> str: text = f"{title} {artist}".strip() try: code = detect(text) if code.startswith("es"): return "Spanish" if code.startswith("en"): return "English" except Exception: pass # heurística por tildes if re.search(r"[áéíóúñÁÉÍÓÚÑ]", text): return "Spanish" return "English" def year_from_release_date(rd: str, precision: str) -> int | None: if not rd: return None try: if precision == "year": return int(rd) return int(rd[:4]) except Exception: return None def collect_from_playlists(playlist_ids, token, yr_min=1980, yr_max=1999, fetch_genres=True): rows = [] seen = set() for pid in playlist_ids: data = request_spotify("GET", f"/playlists/{pid}", token, params={"fields": "name,id"}) pl_name = data.get("name", "") for tr in iter_playlist_tracks(pid, token): if not tr: continue track_id = tr.get("id") title = (tr.get("name") or "").strip() artists = tr.get("artists") or [] primary = artists[0] if artists else {} artist_name = (primary.get("name") or "").strip() album = tr.get("album") or {} rd = album.get("release_date") rdp = album.get("release_date_precision", "day") year = year_from_release_date(rd, rdp) if year is None or not (yr_min <= year <= yr_max): continue # key de dedup key = (title.lower(), artist_name.lower(), year) if key in seen: continue seen.add(key) # géneros del artista genres = [] if fetch_genres and primary.get("id"): try: genres = get_artist_genres(primary["id"], token) except Exception: genres = [] time.sleep(0.02) language = detect_lang(title, artist_name) rows.append({ "title": title, "artist": artist_name, "year": year, "language": language, "artist_genres": "|".join(genres), "playlist_name": pl_name, "playlist_id": pid, "track_id": track_id, }) return rows def main(): parser = argparse.ArgumentParser(description="Spotify data extractor → CSV (80s/90s rock)") parser.add_argument("--client-id", default=os.getenv("CLIENT_ID_SPOTIFY"), help="Spotify Client ID") parser.add_argument("--client-secret", default=os.getenv("CLIENT_SECRET_SPOTIFy"), help="Spotify Client Secret") parser.add_argument("--queries", default="80s rock;90s rock;rock en español 80;rock en español 90;rock and roll classics", help="Consultas de búsqueda de playlists separadas por ';'") parser.add_argument("--limit-playlists", type=int, default=10, help="Máx playlists por consulta") parser.add_argument("--pages-per-query", type=int, default=2, help="Páginas de búsqueda por consulta") parser.add_argument("--extra-playlists", default="https://open.spotify.com/playlist/37i9dQZF1DXc3KygMa1OE7,https://open.spotify.com/playlist/37i9dQZF1DX1spT6G94GFC", help="IDs/URLs de playlists separadas por ','") parser.add_argument("--min-year", type=int, default=1980) parser.add_argument("--max-year", type=int, default=1999) parser.add_argument("--no-genres", action="store_true", help="No consultar géneros del artista (más rápido)") parser.add_argument("--out", default="spotify_rock_80s_90s.csv") args = parser.parse_args() if not args.client_id or not args.client_secret: raise SystemExit("Set --client-id/--client-secret or env SPOTIFY_CLIENT_ID / SPOTIFY_CLIENT_SECRET") token = get_access_token(args.client_id, args.client_secret) # 1) Buscar playlists por consultas playlist_ids = set() for q in [q.strip() for q in args.queries.split(";") if q.strip()]: pls = search_playlists(q, token, limit=args.limit_playlists, pages=args.pages_per_query) for p in pls: playlist_ids.add(p["id"]) # 2) Añadir playlists extra (URLs o IDs) if args.extra_playlists.strip(): for s in args.extra_playlists.split(","): playlist_ids.add(parse_playlist_id(s)) playlist_ids = list(playlist_ids) print(f"Total playlists a procesar: {len(playlist_ids)}") # 3) Coleccionar pistas rows = collect_from_playlists( playlist_ids, token, yr_min=args.min_year, yr_max=args.max_year, fetch_genres=not args.no_genres ) # 4) Guardar CSV df = pd.DataFrame(rows, columns=[ "title","artist","year","language","artist_genres","playlist_name","playlist_id","track_id" ]) df.drop_duplicates(subset=["title","artist","year"], inplace=True) df.to_csv(args.out, index=False) print(f"Escrito {args.out} con {len(df)} filas.") if __name__ == "__main__": main()