rock_chat / code /get_spotipy.py
smitharauco's picture
RAG Rock API
fe846c2
import os
import time
import base64
import argparse
import re
from urllib.parse import urlparse
import requests
import pandas as pd
from tqdm import tqdm
from langdetect import detect, DetectorFactory
DetectorFactory.seed = 0 # resultados deterministas
SPOTIFY_TOKEN_URL = "https://accounts.spotify.com/api/token"
SPOTIFY_API_BASE = "https://api.spotify.com/v1"
def env(var, default=None, required=False):
v = os.getenv(var, default)
if required and not v:
raise SystemExit(f"Missing required env var: {var}")
return v
def get_access_token(client_id: str, client_secret: str) -> str:
auth = base64.b64encode(f"{client_id}:{client_secret}".encode()).decode()
headers = {"Authorization": f"Basic {auth}", "Content-Type": "application/x-www-form-urlencoded"}
data = {"grant_type": "client_credentials"}
r = requests.post(SPOTIFY_TOKEN_URL, headers=headers, data=data, timeout=30)
r.raise_for_status()
return r.json()["access_token"]
def request_spotify(method: str, path: str, token: str, params=None, max_retries=5):
url = path if path.startswith("http") else f"{SPOTIFY_API_BASE}{path}"
headers = {"Authorization": f"Bearer {token}"}
backoff = 1.0
for attempt in range(max_retries):
r = requests.request(method, url, headers=headers, params=params, timeout=30)
if r.status_code == 429:
retry_after = int(r.headers.get("Retry-After", "1"))
time.sleep(retry_after + 0.1)
continue
if 200 <= r.status_code < 300:
return r.json()
if r.status_code >= 500:
time.sleep(backoff)
backoff = min(backoff * 2, 10)
continue
# errores 4xx (no 429)
try:
detail = r.json()
except Exception:
detail = r.text
raise RuntimeError(f"Spotify API {r.status_code}: {detail}")
raise RuntimeError("Max retries exceeded for Spotify API")
def search_playlists(query: str, token: str, limit=20, pages=3):
"""Busca playlists por texto. Devuelve lista de dicts (id, name, owner)."""
results = []
offset = 0
for _ in range(pages):
data = request_spotify(
"GET", "/search", token,
params={"q": query, "type": "playlist", "limit": limit, "offset": offset}
)
items = data.get("playlists", {}).get("items", [])
for it in items:
if it is None:
continue
results.append({
"id": it["id"],
"name": it.get("name", ""),
"owner": (it.get("owner") or {}).get("display_name", ""),
})
if len(items) < limit:
break
offset += limit
time.sleep(0.1)
return results
def parse_playlist_id(s: str) -> str:
"""Acepta ID, URL https://open.spotify.com/playlist/{id} o spotify:playlist:{id}."""
s = s.strip()
if re.match(r"^[A-Za-z0-9]{22}$", s):
return s
if s.startswith("spotify:playlist:"):
return s.split(":")[-1]
if "open.spotify.com" in s:
path = urlparse(s).path
parts = [p for p in path.split("/") if p]
if len(parts) >= 2 and parts[0] == "playlist":
return parts[1]
raise ValueError(f"Unrecognized playlist format: {s}")
def iter_playlist_tracks(playlist_id: str, token: str, max_pages=None):
"""Itera todas las pistas de una playlist (paginado)."""
fields = "items(track(name,id,artists(name,id),album(name,release_date,release_date_precision))),next"
url = f"/playlists/{playlist_id}/tracks"
params = {"limit": 100, "fields": fields}
total = 0
page = 0
while True:
data = request_spotify("GET", url, token, params=params)
items = data.get("items", [])
for it in items:
yield it.get("track")
total += len(items)
page += 1
next_url = data.get("next")
if not next_url:
break
if max_pages and page >= max_pages:
break
url = next_url
params = None # next ya incluye query
time.sleep(0.05)
def get_artist_genres(artist_id: str, token: str):
data = request_spotify("GET", f"/artists/{artist_id}", token)
return data.get("genres", []) or []
def detect_lang(title: str, artist: str) -> str:
text = f"{title} {artist}".strip()
try:
code = detect(text)
if code.startswith("es"):
return "Spanish"
if code.startswith("en"):
return "English"
except Exception:
pass
# heurística por tildes
if re.search(r"[áéíóúñÁÉÍÓÚÑ]", text):
return "Spanish"
return "English"
def year_from_release_date(rd: str, precision: str) -> int | None:
if not rd:
return None
try:
if precision == "year":
return int(rd)
return int(rd[:4])
except Exception:
return None
def collect_from_playlists(playlist_ids, token, yr_min=1980, yr_max=1999, fetch_genres=True):
rows = []
seen = set()
for pid in playlist_ids:
data = request_spotify("GET", f"/playlists/{pid}", token, params={"fields": "name,id"})
pl_name = data.get("name", "")
for tr in iter_playlist_tracks(pid, token):
if not tr:
continue
track_id = tr.get("id")
title = (tr.get("name") or "").strip()
artists = tr.get("artists") or []
primary = artists[0] if artists else {}
artist_name = (primary.get("name") or "").strip()
album = tr.get("album") or {}
rd = album.get("release_date")
rdp = album.get("release_date_precision", "day")
year = year_from_release_date(rd, rdp)
if year is None or not (yr_min <= year <= yr_max):
continue
# key de dedup
key = (title.lower(), artist_name.lower(), year)
if key in seen:
continue
seen.add(key)
# géneros del artista
genres = []
if fetch_genres and primary.get("id"):
try:
genres = get_artist_genres(primary["id"], token)
except Exception:
genres = []
time.sleep(0.02)
language = detect_lang(title, artist_name)
rows.append({
"title": title,
"artist": artist_name,
"year": year,
"language": language,
"artist_genres": "|".join(genres),
"playlist_name": pl_name,
"playlist_id": pid,
"track_id": track_id,
})
return rows
def main():
parser = argparse.ArgumentParser(description="Spotify data extractor → CSV (80s/90s rock)")
parser.add_argument("--client-id", default=os.getenv("CLIENT_ID_SPOTIFY"), help="Spotify Client ID")
parser.add_argument("--client-secret", default=os.getenv("CLIENT_SECRET_SPOTIFy"), help="Spotify Client Secret")
parser.add_argument("--queries", default="80s rock;90s rock;rock en español 80;rock en español 90;rock and roll classics",
help="Consultas de búsqueda de playlists separadas por ';'")
parser.add_argument("--limit-playlists", type=int, default=10, help="Máx playlists por consulta")
parser.add_argument("--pages-per-query", type=int, default=2, help="Páginas de búsqueda por consulta")
parser.add_argument("--extra-playlists", default="https://open.spotify.com/playlist/37i9dQZF1DXc3KygMa1OE7,https://open.spotify.com/playlist/37i9dQZF1DX1spT6G94GFC", help="IDs/URLs de playlists separadas por ','")
parser.add_argument("--min-year", type=int, default=1980)
parser.add_argument("--max-year", type=int, default=1999)
parser.add_argument("--no-genres", action="store_true", help="No consultar géneros del artista (más rápido)")
parser.add_argument("--out", default="spotify_rock_80s_90s.csv")
args = parser.parse_args()
if not args.client_id or not args.client_secret:
raise SystemExit("Set --client-id/--client-secret or env SPOTIFY_CLIENT_ID / SPOTIFY_CLIENT_SECRET")
token = get_access_token(args.client_id, args.client_secret)
# 1) Buscar playlists por consultas
playlist_ids = set()
for q in [q.strip() for q in args.queries.split(";") if q.strip()]:
pls = search_playlists(q, token, limit=args.limit_playlists, pages=args.pages_per_query)
for p in pls:
playlist_ids.add(p["id"])
# 2) Añadir playlists extra (URLs o IDs)
if args.extra_playlists.strip():
for s in args.extra_playlists.split(","):
playlist_ids.add(parse_playlist_id(s))
playlist_ids = list(playlist_ids)
print(f"Total playlists a procesar: {len(playlist_ids)}")
# 3) Coleccionar pistas
rows = collect_from_playlists(
playlist_ids,
token,
yr_min=args.min_year,
yr_max=args.max_year,
fetch_genres=not args.no_genres
)
# 4) Guardar CSV
df = pd.DataFrame(rows, columns=[
"title","artist","year","language","artist_genres","playlist_name","playlist_id","track_id"
])
df.drop_duplicates(subset=["title","artist","year"], inplace=True)
df.to_csv(args.out, index=False)
print(f"Escrito {args.out} con {len(df)} filas.")
if __name__ == "__main__":
main()