Spaces:
Sleeping
Sleeping
| import os | |
| import time | |
| import base64 | |
| import argparse | |
| import re | |
| from urllib.parse import urlparse | |
| import requests | |
| import pandas as pd | |
| from tqdm import tqdm | |
| from langdetect import detect, DetectorFactory | |
| DetectorFactory.seed = 0 # resultados deterministas | |
| SPOTIFY_TOKEN_URL = "https://accounts.spotify.com/api/token" | |
| SPOTIFY_API_BASE = "https://api.spotify.com/v1" | |
| def env(var, default=None, required=False): | |
| v = os.getenv(var, default) | |
| if required and not v: | |
| raise SystemExit(f"Missing required env var: {var}") | |
| return v | |
| def get_access_token(client_id: str, client_secret: str) -> str: | |
| auth = base64.b64encode(f"{client_id}:{client_secret}".encode()).decode() | |
| headers = {"Authorization": f"Basic {auth}", "Content-Type": "application/x-www-form-urlencoded"} | |
| data = {"grant_type": "client_credentials"} | |
| r = requests.post(SPOTIFY_TOKEN_URL, headers=headers, data=data, timeout=30) | |
| r.raise_for_status() | |
| return r.json()["access_token"] | |
| def request_spotify(method: str, path: str, token: str, params=None, max_retries=5): | |
| url = path if path.startswith("http") else f"{SPOTIFY_API_BASE}{path}" | |
| headers = {"Authorization": f"Bearer {token}"} | |
| backoff = 1.0 | |
| for attempt in range(max_retries): | |
| r = requests.request(method, url, headers=headers, params=params, timeout=30) | |
| if r.status_code == 429: | |
| retry_after = int(r.headers.get("Retry-After", "1")) | |
| time.sleep(retry_after + 0.1) | |
| continue | |
| if 200 <= r.status_code < 300: | |
| return r.json() | |
| if r.status_code >= 500: | |
| time.sleep(backoff) | |
| backoff = min(backoff * 2, 10) | |
| continue | |
| # errores 4xx (no 429) | |
| try: | |
| detail = r.json() | |
| except Exception: | |
| detail = r.text | |
| raise RuntimeError(f"Spotify API {r.status_code}: {detail}") | |
| raise RuntimeError("Max retries exceeded for Spotify API") | |
| def search_playlists(query: str, token: str, limit=20, pages=3): | |
| """Busca playlists por texto. Devuelve lista de dicts (id, name, owner).""" | |
| results = [] | |
| offset = 0 | |
| for _ in range(pages): | |
| data = request_spotify( | |
| "GET", "/search", token, | |
| params={"q": query, "type": "playlist", "limit": limit, "offset": offset} | |
| ) | |
| items = data.get("playlists", {}).get("items", []) | |
| for it in items: | |
| if it is None: | |
| continue | |
| results.append({ | |
| "id": it["id"], | |
| "name": it.get("name", ""), | |
| "owner": (it.get("owner") or {}).get("display_name", ""), | |
| }) | |
| if len(items) < limit: | |
| break | |
| offset += limit | |
| time.sleep(0.1) | |
| return results | |
| def parse_playlist_id(s: str) -> str: | |
| """Acepta ID, URL https://open.spotify.com/playlist/{id} o spotify:playlist:{id}.""" | |
| s = s.strip() | |
| if re.match(r"^[A-Za-z0-9]{22}$", s): | |
| return s | |
| if s.startswith("spotify:playlist:"): | |
| return s.split(":")[-1] | |
| if "open.spotify.com" in s: | |
| path = urlparse(s).path | |
| parts = [p for p in path.split("/") if p] | |
| if len(parts) >= 2 and parts[0] == "playlist": | |
| return parts[1] | |
| raise ValueError(f"Unrecognized playlist format: {s}") | |
| def iter_playlist_tracks(playlist_id: str, token: str, max_pages=None): | |
| """Itera todas las pistas de una playlist (paginado).""" | |
| fields = "items(track(name,id,artists(name,id),album(name,release_date,release_date_precision))),next" | |
| url = f"/playlists/{playlist_id}/tracks" | |
| params = {"limit": 100, "fields": fields} | |
| total = 0 | |
| page = 0 | |
| while True: | |
| data = request_spotify("GET", url, token, params=params) | |
| items = data.get("items", []) | |
| for it in items: | |
| yield it.get("track") | |
| total += len(items) | |
| page += 1 | |
| next_url = data.get("next") | |
| if not next_url: | |
| break | |
| if max_pages and page >= max_pages: | |
| break | |
| url = next_url | |
| params = None # next ya incluye query | |
| time.sleep(0.05) | |
| def get_artist_genres(artist_id: str, token: str): | |
| data = request_spotify("GET", f"/artists/{artist_id}", token) | |
| return data.get("genres", []) or [] | |
| def detect_lang(title: str, artist: str) -> str: | |
| text = f"{title} {artist}".strip() | |
| try: | |
| code = detect(text) | |
| if code.startswith("es"): | |
| return "Spanish" | |
| if code.startswith("en"): | |
| return "English" | |
| except Exception: | |
| pass | |
| # heurística por tildes | |
| if re.search(r"[áéíóúñÁÉÍÓÚÑ]", text): | |
| return "Spanish" | |
| return "English" | |
| def year_from_release_date(rd: str, precision: str) -> int | None: | |
| if not rd: | |
| return None | |
| try: | |
| if precision == "year": | |
| return int(rd) | |
| return int(rd[:4]) | |
| except Exception: | |
| return None | |
| def collect_from_playlists(playlist_ids, token, yr_min=1980, yr_max=1999, fetch_genres=True): | |
| rows = [] | |
| seen = set() | |
| for pid in playlist_ids: | |
| data = request_spotify("GET", f"/playlists/{pid}", token, params={"fields": "name,id"}) | |
| pl_name = data.get("name", "") | |
| for tr in iter_playlist_tracks(pid, token): | |
| if not tr: | |
| continue | |
| track_id = tr.get("id") | |
| title = (tr.get("name") or "").strip() | |
| artists = tr.get("artists") or [] | |
| primary = artists[0] if artists else {} | |
| artist_name = (primary.get("name") or "").strip() | |
| album = tr.get("album") or {} | |
| rd = album.get("release_date") | |
| rdp = album.get("release_date_precision", "day") | |
| year = year_from_release_date(rd, rdp) | |
| if year is None or not (yr_min <= year <= yr_max): | |
| continue | |
| # key de dedup | |
| key = (title.lower(), artist_name.lower(), year) | |
| if key in seen: | |
| continue | |
| seen.add(key) | |
| # géneros del artista | |
| genres = [] | |
| if fetch_genres and primary.get("id"): | |
| try: | |
| genres = get_artist_genres(primary["id"], token) | |
| except Exception: | |
| genres = [] | |
| time.sleep(0.02) | |
| language = detect_lang(title, artist_name) | |
| rows.append({ | |
| "title": title, | |
| "artist": artist_name, | |
| "year": year, | |
| "language": language, | |
| "artist_genres": "|".join(genres), | |
| "playlist_name": pl_name, | |
| "playlist_id": pid, | |
| "track_id": track_id, | |
| }) | |
| return rows | |
| def main(): | |
| parser = argparse.ArgumentParser(description="Spotify data extractor → CSV (80s/90s rock)") | |
| parser.add_argument("--client-id", default=os.getenv("CLIENT_ID_SPOTIFY"), help="Spotify Client ID") | |
| parser.add_argument("--client-secret", default=os.getenv("CLIENT_SECRET_SPOTIFy"), help="Spotify Client Secret") | |
| parser.add_argument("--queries", default="80s rock;90s rock;rock en español 80;rock en español 90;rock and roll classics", | |
| help="Consultas de búsqueda de playlists separadas por ';'") | |
| parser.add_argument("--limit-playlists", type=int, default=10, help="Máx playlists por consulta") | |
| parser.add_argument("--pages-per-query", type=int, default=2, help="Páginas de búsqueda por consulta") | |
| parser.add_argument("--extra-playlists", default="https://open.spotify.com/playlist/37i9dQZF1DXc3KygMa1OE7,https://open.spotify.com/playlist/37i9dQZF1DX1spT6G94GFC", help="IDs/URLs de playlists separadas por ','") | |
| parser.add_argument("--min-year", type=int, default=1980) | |
| parser.add_argument("--max-year", type=int, default=1999) | |
| parser.add_argument("--no-genres", action="store_true", help="No consultar géneros del artista (más rápido)") | |
| parser.add_argument("--out", default="spotify_rock_80s_90s.csv") | |
| args = parser.parse_args() | |
| if not args.client_id or not args.client_secret: | |
| raise SystemExit("Set --client-id/--client-secret or env SPOTIFY_CLIENT_ID / SPOTIFY_CLIENT_SECRET") | |
| token = get_access_token(args.client_id, args.client_secret) | |
| # 1) Buscar playlists por consultas | |
| playlist_ids = set() | |
| for q in [q.strip() for q in args.queries.split(";") if q.strip()]: | |
| pls = search_playlists(q, token, limit=args.limit_playlists, pages=args.pages_per_query) | |
| for p in pls: | |
| playlist_ids.add(p["id"]) | |
| # 2) Añadir playlists extra (URLs o IDs) | |
| if args.extra_playlists.strip(): | |
| for s in args.extra_playlists.split(","): | |
| playlist_ids.add(parse_playlist_id(s)) | |
| playlist_ids = list(playlist_ids) | |
| print(f"Total playlists a procesar: {len(playlist_ids)}") | |
| # 3) Coleccionar pistas | |
| rows = collect_from_playlists( | |
| playlist_ids, | |
| token, | |
| yr_min=args.min_year, | |
| yr_max=args.max_year, | |
| fetch_genres=not args.no_genres | |
| ) | |
| # 4) Guardar CSV | |
| df = pd.DataFrame(rows, columns=[ | |
| "title","artist","year","language","artist_genres","playlist_name","playlist_id","track_id" | |
| ]) | |
| df.drop_duplicates(subset=["title","artist","year"], inplace=True) | |
| df.to_csv(args.out, index=False) | |
| print(f"Escrito {args.out} con {len(df)} filas.") | |
| if __name__ == "__main__": | |
| main() | |