Spaces:
Sleeping
Sleeping
File size: 9,446 Bytes
fe846c2 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 |
import os
import time
import base64
import argparse
import re
from urllib.parse import urlparse
import requests
import pandas as pd
from tqdm import tqdm
from langdetect import detect, DetectorFactory
DetectorFactory.seed = 0 # resultados deterministas
SPOTIFY_TOKEN_URL = "https://accounts.spotify.com/api/token"
SPOTIFY_API_BASE = "https://api.spotify.com/v1"
def env(var, default=None, required=False):
v = os.getenv(var, default)
if required and not v:
raise SystemExit(f"Missing required env var: {var}")
return v
def get_access_token(client_id: str, client_secret: str) -> str:
auth = base64.b64encode(f"{client_id}:{client_secret}".encode()).decode()
headers = {"Authorization": f"Basic {auth}", "Content-Type": "application/x-www-form-urlencoded"}
data = {"grant_type": "client_credentials"}
r = requests.post(SPOTIFY_TOKEN_URL, headers=headers, data=data, timeout=30)
r.raise_for_status()
return r.json()["access_token"]
def request_spotify(method: str, path: str, token: str, params=None, max_retries=5):
url = path if path.startswith("http") else f"{SPOTIFY_API_BASE}{path}"
headers = {"Authorization": f"Bearer {token}"}
backoff = 1.0
for attempt in range(max_retries):
r = requests.request(method, url, headers=headers, params=params, timeout=30)
if r.status_code == 429:
retry_after = int(r.headers.get("Retry-After", "1"))
time.sleep(retry_after + 0.1)
continue
if 200 <= r.status_code < 300:
return r.json()
if r.status_code >= 500:
time.sleep(backoff)
backoff = min(backoff * 2, 10)
continue
# errores 4xx (no 429)
try:
detail = r.json()
except Exception:
detail = r.text
raise RuntimeError(f"Spotify API {r.status_code}: {detail}")
raise RuntimeError("Max retries exceeded for Spotify API")
def search_playlists(query: str, token: str, limit=20, pages=3):
"""Busca playlists por texto. Devuelve lista de dicts (id, name, owner)."""
results = []
offset = 0
for _ in range(pages):
data = request_spotify(
"GET", "/search", token,
params={"q": query, "type": "playlist", "limit": limit, "offset": offset}
)
items = data.get("playlists", {}).get("items", [])
for it in items:
if it is None:
continue
results.append({
"id": it["id"],
"name": it.get("name", ""),
"owner": (it.get("owner") or {}).get("display_name", ""),
})
if len(items) < limit:
break
offset += limit
time.sleep(0.1)
return results
def parse_playlist_id(s: str) -> str:
"""Acepta ID, URL https://open.spotify.com/playlist/{id} o spotify:playlist:{id}."""
s = s.strip()
if re.match(r"^[A-Za-z0-9]{22}$", s):
return s
if s.startswith("spotify:playlist:"):
return s.split(":")[-1]
if "open.spotify.com" in s:
path = urlparse(s).path
parts = [p for p in path.split("/") if p]
if len(parts) >= 2 and parts[0] == "playlist":
return parts[1]
raise ValueError(f"Unrecognized playlist format: {s}")
def iter_playlist_tracks(playlist_id: str, token: str, max_pages=None):
"""Itera todas las pistas de una playlist (paginado)."""
fields = "items(track(name,id,artists(name,id),album(name,release_date,release_date_precision))),next"
url = f"/playlists/{playlist_id}/tracks"
params = {"limit": 100, "fields": fields}
total = 0
page = 0
while True:
data = request_spotify("GET", url, token, params=params)
items = data.get("items", [])
for it in items:
yield it.get("track")
total += len(items)
page += 1
next_url = data.get("next")
if not next_url:
break
if max_pages and page >= max_pages:
break
url = next_url
params = None # next ya incluye query
time.sleep(0.05)
def get_artist_genres(artist_id: str, token: str):
data = request_spotify("GET", f"/artists/{artist_id}", token)
return data.get("genres", []) or []
def detect_lang(title: str, artist: str) -> str:
text = f"{title} {artist}".strip()
try:
code = detect(text)
if code.startswith("es"):
return "Spanish"
if code.startswith("en"):
return "English"
except Exception:
pass
# heurística por tildes
if re.search(r"[áéíóúñÁÉÍÓÚÑ]", text):
return "Spanish"
return "English"
def year_from_release_date(rd: str, precision: str) -> int | None:
if not rd:
return None
try:
if precision == "year":
return int(rd)
return int(rd[:4])
except Exception:
return None
def collect_from_playlists(playlist_ids, token, yr_min=1980, yr_max=1999, fetch_genres=True):
rows = []
seen = set()
for pid in playlist_ids:
data = request_spotify("GET", f"/playlists/{pid}", token, params={"fields": "name,id"})
pl_name = data.get("name", "")
for tr in iter_playlist_tracks(pid, token):
if not tr:
continue
track_id = tr.get("id")
title = (tr.get("name") or "").strip()
artists = tr.get("artists") or []
primary = artists[0] if artists else {}
artist_name = (primary.get("name") or "").strip()
album = tr.get("album") or {}
rd = album.get("release_date")
rdp = album.get("release_date_precision", "day")
year = year_from_release_date(rd, rdp)
if year is None or not (yr_min <= year <= yr_max):
continue
# key de dedup
key = (title.lower(), artist_name.lower(), year)
if key in seen:
continue
seen.add(key)
# géneros del artista
genres = []
if fetch_genres and primary.get("id"):
try:
genres = get_artist_genres(primary["id"], token)
except Exception:
genres = []
time.sleep(0.02)
language = detect_lang(title, artist_name)
rows.append({
"title": title,
"artist": artist_name,
"year": year,
"language": language,
"artist_genres": "|".join(genres),
"playlist_name": pl_name,
"playlist_id": pid,
"track_id": track_id,
})
return rows
def main():
parser = argparse.ArgumentParser(description="Spotify data extractor → CSV (80s/90s rock)")
parser.add_argument("--client-id", default=os.getenv("CLIENT_ID_SPOTIFY"), help="Spotify Client ID")
parser.add_argument("--client-secret", default=os.getenv("CLIENT_SECRET_SPOTIFy"), help="Spotify Client Secret")
parser.add_argument("--queries", default="80s rock;90s rock;rock en español 80;rock en español 90;rock and roll classics",
help="Consultas de búsqueda de playlists separadas por ';'")
parser.add_argument("--limit-playlists", type=int, default=10, help="Máx playlists por consulta")
parser.add_argument("--pages-per-query", type=int, default=2, help="Páginas de búsqueda por consulta")
parser.add_argument("--extra-playlists", default="https://open.spotify.com/playlist/37i9dQZF1DXc3KygMa1OE7,https://open.spotify.com/playlist/37i9dQZF1DX1spT6G94GFC", help="IDs/URLs de playlists separadas por ','")
parser.add_argument("--min-year", type=int, default=1980)
parser.add_argument("--max-year", type=int, default=1999)
parser.add_argument("--no-genres", action="store_true", help="No consultar géneros del artista (más rápido)")
parser.add_argument("--out", default="spotify_rock_80s_90s.csv")
args = parser.parse_args()
if not args.client_id or not args.client_secret:
raise SystemExit("Set --client-id/--client-secret or env SPOTIFY_CLIENT_ID / SPOTIFY_CLIENT_SECRET")
token = get_access_token(args.client_id, args.client_secret)
# 1) Buscar playlists por consultas
playlist_ids = set()
for q in [q.strip() for q in args.queries.split(";") if q.strip()]:
pls = search_playlists(q, token, limit=args.limit_playlists, pages=args.pages_per_query)
for p in pls:
playlist_ids.add(p["id"])
# 2) Añadir playlists extra (URLs o IDs)
if args.extra_playlists.strip():
for s in args.extra_playlists.split(","):
playlist_ids.add(parse_playlist_id(s))
playlist_ids = list(playlist_ids)
print(f"Total playlists a procesar: {len(playlist_ids)}")
# 3) Coleccionar pistas
rows = collect_from_playlists(
playlist_ids,
token,
yr_min=args.min_year,
yr_max=args.max_year,
fetch_genres=not args.no_genres
)
# 4) Guardar CSV
df = pd.DataFrame(rows, columns=[
"title","artist","year","language","artist_genres","playlist_name","playlist_id","track_id"
])
df.drop_duplicates(subset=["title","artist","year"], inplace=True)
df.to_csv(args.out, index=False)
print(f"Escrito {args.out} con {len(df)} filas.")
if __name__ == "__main__":
main()
|