File size: 9,446 Bytes
fe846c2
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
import os
import time
import base64
import argparse
import re
from urllib.parse import urlparse
import requests
import pandas as pd
from tqdm import tqdm
from langdetect import detect, DetectorFactory

DetectorFactory.seed = 0  # resultados deterministas

SPOTIFY_TOKEN_URL = "https://accounts.spotify.com/api/token"
SPOTIFY_API_BASE = "https://api.spotify.com/v1"

def env(var, default=None, required=False):
    v = os.getenv(var, default)
    if required and not v:
        raise SystemExit(f"Missing required env var: {var}")
    return v

def get_access_token(client_id: str, client_secret: str) -> str:
    auth = base64.b64encode(f"{client_id}:{client_secret}".encode()).decode()
    headers = {"Authorization": f"Basic {auth}", "Content-Type": "application/x-www-form-urlencoded"}
    data = {"grant_type": "client_credentials"}
    r = requests.post(SPOTIFY_TOKEN_URL, headers=headers, data=data, timeout=30)
    r.raise_for_status()
    return r.json()["access_token"]

def request_spotify(method: str, path: str, token: str, params=None, max_retries=5):
    url = path if path.startswith("http") else f"{SPOTIFY_API_BASE}{path}"
    headers = {"Authorization": f"Bearer {token}"}
    backoff = 1.0
    for attempt in range(max_retries):
        r = requests.request(method, url, headers=headers, params=params, timeout=30)
        if r.status_code == 429:
            retry_after = int(r.headers.get("Retry-After", "1"))
            time.sleep(retry_after + 0.1)
            continue
        if 200 <= r.status_code < 300:
            return r.json()
        if r.status_code >= 500:
            time.sleep(backoff)
            backoff = min(backoff * 2, 10)
            continue
        # errores 4xx (no 429)
        try:
            detail = r.json()
        except Exception:
            detail = r.text
        raise RuntimeError(f"Spotify API {r.status_code}: {detail}")
    raise RuntimeError("Max retries exceeded for Spotify API")

def search_playlists(query: str, token: str, limit=20, pages=3):
    """Busca playlists por texto. Devuelve lista de dicts (id, name, owner)."""
    results = []
    offset = 0
    for _ in range(pages):
        data = request_spotify(
            "GET", "/search", token,
            params={"q": query, "type": "playlist", "limit": limit, "offset": offset}
        )
        items = data.get("playlists", {}).get("items", [])
        for it in items:
            if it is None:
                continue
            results.append({
                "id": it["id"],
                "name": it.get("name", ""),
                "owner": (it.get("owner") or {}).get("display_name", ""),
            })
        if len(items) < limit:
            break
        offset += limit
        time.sleep(0.1)
    return results

def parse_playlist_id(s: str) -> str:
    """Acepta ID, URL https://open.spotify.com/playlist/{id} o spotify:playlist:{id}."""
    s = s.strip()
    if re.match(r"^[A-Za-z0-9]{22}$", s):
        return s
    if s.startswith("spotify:playlist:"):
        return s.split(":")[-1]
    if "open.spotify.com" in s:
        path = urlparse(s).path
        parts = [p for p in path.split("/") if p]
        if len(parts) >= 2 and parts[0] == "playlist":
            return parts[1]
    raise ValueError(f"Unrecognized playlist format: {s}")

def iter_playlist_tracks(playlist_id: str, token: str, max_pages=None):
    """Itera todas las pistas de una playlist (paginado)."""
    fields = "items(track(name,id,artists(name,id),album(name,release_date,release_date_precision))),next"
    url = f"/playlists/{playlist_id}/tracks"
    params = {"limit": 100, "fields": fields}
    total = 0
    page = 0
    while True:
        data = request_spotify("GET", url, token, params=params)
        items = data.get("items", [])
        for it in items:
            yield it.get("track")
        total += len(items)
        page += 1
        next_url = data.get("next")
        if not next_url:
            break
        if max_pages and page >= max_pages:
            break
        url = next_url
        params = None  # next ya incluye query
        time.sleep(0.05)

def get_artist_genres(artist_id: str, token: str):
    data = request_spotify("GET", f"/artists/{artist_id}", token)
    return data.get("genres", []) or []

def detect_lang(title: str, artist: str) -> str:
    text = f"{title} {artist}".strip()
    try:
        code = detect(text)
        if code.startswith("es"):
            return "Spanish"
        if code.startswith("en"):
            return "English"
    except Exception:
        pass
    # heurística por tildes
    if re.search(r"[áéíóúñÁÉÍÓÚÑ]", text):
        return "Spanish"
    return "English"

def year_from_release_date(rd: str, precision: str) -> int | None:
    if not rd:
        return None
    try:
        if precision == "year":
            return int(rd)
        return int(rd[:4])
    except Exception:
        return None

def collect_from_playlists(playlist_ids, token, yr_min=1980, yr_max=1999, fetch_genres=True):
    rows = []
    seen = set()
    for pid in playlist_ids:
        data = request_spotify("GET", f"/playlists/{pid}", token, params={"fields": "name,id"})
        pl_name = data.get("name", "")
        for tr in iter_playlist_tracks(pid, token):
            if not tr:
                continue
            track_id = tr.get("id")
            title = (tr.get("name") or "").strip()
            artists = tr.get("artists") or []
            primary = artists[0] if artists else {}
            artist_name = (primary.get("name") or "").strip()
            album = tr.get("album") or {}
            rd = album.get("release_date")
            rdp = album.get("release_date_precision", "day")
            year = year_from_release_date(rd, rdp)
            if year is None or not (yr_min <= year <= yr_max):
                continue
            # key de dedup
            key = (title.lower(), artist_name.lower(), year)
            if key in seen:
                continue
            seen.add(key)
            # géneros del artista
            genres = []
            if fetch_genres and primary.get("id"):
                try:
                    genres = get_artist_genres(primary["id"], token)
                except Exception:
                    genres = []
                time.sleep(0.02)
            language = detect_lang(title, artist_name)
            rows.append({
                "title": title,
                "artist": artist_name,
                "year": year,
                "language": language,
                "artist_genres": "|".join(genres),
                "playlist_name": pl_name,
                "playlist_id": pid,
                "track_id": track_id,
            })
    return rows

def main():
    parser = argparse.ArgumentParser(description="Spotify data extractor → CSV (80s/90s rock)")
    parser.add_argument("--client-id", default=os.getenv("CLIENT_ID_SPOTIFY"), help="Spotify Client ID")
    parser.add_argument("--client-secret", default=os.getenv("CLIENT_SECRET_SPOTIFy"), help="Spotify Client Secret")
    parser.add_argument("--queries", default="80s rock;90s rock;rock en español 80;rock en español 90;rock and roll classics",
                        help="Consultas de búsqueda de playlists separadas por ';'")
    parser.add_argument("--limit-playlists", type=int, default=10, help="Máx playlists por consulta")
    parser.add_argument("--pages-per-query", type=int, default=2, help="Páginas de búsqueda por consulta")
    parser.add_argument("--extra-playlists", default="https://open.spotify.com/playlist/37i9dQZF1DXc3KygMa1OE7,https://open.spotify.com/playlist/37i9dQZF1DX1spT6G94GFC", help="IDs/URLs de playlists separadas por ','")
    parser.add_argument("--min-year", type=int, default=1980)
    parser.add_argument("--max-year", type=int, default=1999)
    parser.add_argument("--no-genres", action="store_true", help="No consultar géneros del artista (más rápido)")
    parser.add_argument("--out", default="spotify_rock_80s_90s.csv")
    args = parser.parse_args()

    if not args.client_id or not args.client_secret:
        raise SystemExit("Set --client-id/--client-secret or env SPOTIFY_CLIENT_ID / SPOTIFY_CLIENT_SECRET")

    token = get_access_token(args.client_id, args.client_secret)

    # 1) Buscar playlists por consultas
    playlist_ids = set()
    for q in [q.strip() for q in args.queries.split(";") if q.strip()]:
        pls = search_playlists(q, token, limit=args.limit_playlists, pages=args.pages_per_query)
        for p in pls:
            playlist_ids.add(p["id"])

    # 2) Añadir playlists extra (URLs o IDs)
    if args.extra_playlists.strip():
        for s in args.extra_playlists.split(","):
            playlist_ids.add(parse_playlist_id(s))

    playlist_ids = list(playlist_ids)
    print(f"Total playlists a procesar: {len(playlist_ids)}")

    # 3) Coleccionar pistas
    rows = collect_from_playlists(
        playlist_ids,
        token,
        yr_min=args.min_year,
        yr_max=args.max_year,
        fetch_genres=not args.no_genres
    )

    # 4) Guardar CSV
    df = pd.DataFrame(rows, columns=[
        "title","artist","year","language","artist_genres","playlist_name","playlist_id","track_id"
    ])
    df.drop_duplicates(subset=["title","artist","year"], inplace=True)
    df.to_csv(args.out, index=False)
    print(f"Escrito {args.out} con {len(df)} filas.")

if __name__ == "__main__":
    main()