Spaces:
Sleeping
Sleeping
| import os | |
| import time | |
| import csv | |
| import math | |
| import json | |
| import re | |
| import requests | |
| import pandas as pd | |
| from urllib.parse import urlencode | |
| from langdetect import detect, DetectorFactory | |
| from rapidfuzz import fuzz | |
| from slugify import slugify | |
| from tqdm import tqdm | |
| # -------------------------- | |
| # Configuración | |
| # -------------------------- | |
| LASTFM_API_KEY = os.getenv("LASTFM_API_KEY", "") # Requerido (https://www.last.fm/api/account/create) | |
| DISCOGS_TOKEN = os.getenv("DISCOGS_TOKEN", "") # Recomendado (https://www.discogs.com/settings/developers) | |
| USER_AGENT = os.getenv("USER_AGENT", "rock-rag/1.0 (+https://example.com)") | |
| # Tags de Last.fm para cosechar rock (80s/90s en inglés y español) | |
| LASTFM_TAGS = [ | |
| "rock", "classic rock", "rock and roll", | |
| "hard rock", "alternative rock", | |
| "rock en español", "spanish rock", "latin rock", | |
| ] | |
| # Paginación / límites | |
| LASTFM_PAGE_LIMIT = 50 # elementos por página (Last.fm: 50 por defecto; 100 suele funcionar) | |
| LASTFM_MAX_PAGES = 40 # por tag (sube si quieres más; 40 * 50 = 2k pistas por tag) | |
| # Discogs: respetar <=60 req/min → usaremos una pausa ligera | |
| # Filtrado por años objetivo | |
| YEAR_MIN, YEAR_MAX = 1980, 1999 | |
| # CSV de salida | |
| OUTPUT_CSV = "/home/smith/Rock_RAG/code/rock_80s_90s_lastfm_discogs.csv" | |
| DetectorFactory.seed = 0 # langdetect determinístico | |
| # -------------------------- | |
| # Utilidades | |
| # -------------------------- | |
| def norm_txt(s: str) -> str: | |
| return re.sub(r"\s+", " ", s.strip()) | |
| def lang_of(title: str, artist: str) -> str: | |
| text = f"{title} {artist}" | |
| try: | |
| code = detect(text) | |
| if code.startswith("es"): return "Spanish" | |
| if code.startswith("en"): return "English" | |
| # heurística por caracteres acentuados | |
| if re.search(r"[áéíóúñÁÉÍÓÚÑ]", text): return "Spanish" | |
| except Exception: | |
| pass | |
| return "English" | |
| def key_for(title: str, artist: str) -> str: | |
| return slugify(norm_txt(title) + "||" + norm_txt(artist)) | |
| def soft_equal(a: str, b: str) -> bool: | |
| # Similaridad flexible para evitar duplicados (títulos con paréntesis, remasters, etc.) | |
| a, b = norm_txt(a).lower(), norm_txt(b).lower() | |
| return fuzz.token_set_ratio(a, b) >= 95 | |
| # -------------------------- | |
| # Last.fm: tag.getTopTracks | |
| # Doc: https://www.last.fm/api/show/tag.getTopTracks | |
| # -------------------------- | |
| def lastfm_get_top_tracks_for_tag(tag: str, api_key: str, page: int, limit: int = LASTFM_PAGE_LIMIT): | |
| base = "https://ws.audioscrobbler.com/2.0/" | |
| params = { | |
| "method": "tag.gettoptracks", | |
| "tag": tag, | |
| "api_key": api_key, | |
| "format": "json", | |
| "page": page, | |
| "limit": limit, | |
| } | |
| resp = requests.get(base, params=params, headers={"User-Agent": USER_AGENT}, timeout=30) | |
| resp.raise_for_status() | |
| return resp.json() | |
| def harvest_lastfm(seed_tags=LASTFM_TAGS, api_key=LASTFM_API_KEY, max_pages=LASTFM_MAX_PAGES): | |
| assert api_key, "Falta LASTFM_API_KEY (entorno). Consigue una en https://www.last.fm/api" | |
| rows = [] | |
| seen = set() | |
| for tag in seed_tags: | |
| # Primera página para saber total | |
| first = lastfm_get_top_tracks_for_tag(tag, api_key, page=1, limit=LASTFM_PAGE_LIMIT) | |
| toptracks = first.get("tracks", {}) | |
| total_pages = int(toptracks.get("@attr", {}).get("totalPages", 1)) | |
| total_pages = min(total_pages, max_pages) | |
| for page in tqdm(range(1, total_pages + 1), desc=f"Last.fm tag:{tag}"): | |
| if page > 1: | |
| data = lastfm_get_top_tracks_for_tag(tag, api_key, page=page, limit=LASTFM_PAGE_LIMIT) | |
| else: | |
| data = first | |
| items = data.get("tracks", {}).get("track", []) or [] | |
| for it in items: | |
| title = norm_txt(it.get("name", "")) | |
| artist = norm_txt((it.get("artist") or {}).get("name", "")) | |
| if not title or not artist: | |
| continue | |
| k = key_for(title, artist) | |
| if k in seen: | |
| continue | |
| seen.add(k) | |
| rows.append({ | |
| "title": title, | |
| "artist": artist, | |
| "source_tag": tag | |
| }) | |
| # rate-limit ligero entre páginas (Last.fm es permisivo; ajusta si fuese necesario) | |
| time.sleep(0.20) | |
| return rows | |
| # -------------------------- | |
| # Discogs: búsqueda para obtener año | |
| # Doc base: https://www.discogs.com/developers | |
| # Rate limit: ~60 req/min | |
| # Token personal en Authorization: Discogs token=XXXX | |
| # -------------------------- | |
| def discogs_search_release(title: str, artist: str, token: str): | |
| # búsqueda "q" + filtros básicos; luego filtramos por año en cliente | |
| url = "https://api.discogs.com/database/search" | |
| params = { | |
| "q": f"{title} {artist}", | |
| "artist": artist, | |
| "type": "release", | |
| "per_page": 50, | |
| "page": 1, | |
| } | |
| headers = {"User-Agent": USER_AGENT} | |
| if token: | |
| headers["Authorization"] = f"Discogs token={token}" | |
| r = requests.get(url, params=params, headers=headers, timeout=30) | |
| r.raise_for_status() | |
| return r.json() | |
| def guess_year_from_discogs(result_items, title: str, artist: str): | |
| # Elegimos mejor candidato por: | |
| # 1) título/autor similar | |
| # 2) año dentro del rango objetivo si es posible | |
| best = None | |
| best_score = -1 | |
| for it in result_items or []: | |
| y = it.get("year") | |
| if not y: | |
| continue | |
| tcand = norm_txt(it.get("title", "")) # suele venir como "Artist - Title" | |
| # separar para comparar | |
| # Heurística: usar después de " - " | |
| parts = [p.strip() for p in tcand.split(" - ", 1)] | |
| c_artist = parts[0] if len(parts) == 2 else "" | |
| c_title = parts[1] if len(parts) == 2 else tcand | |
| score = 0 | |
| score += fuzz.token_set_ratio(title.lower(), c_title.lower()) * 0.7 | |
| score += fuzz.token_set_ratio(artist.lower(), c_artist.lower()) * 0.3 | |
| # bonificación si está en rango 80-99 | |
| if isinstance(y, int) and YEAR_MIN <= y <= YEAR_MAX: | |
| score += 5 | |
| if score > best_score: | |
| best_score = score | |
| best = it | |
| if best and isinstance(best.get("year"), int): | |
| return best.get("year") | |
| return None | |
| def enrich_with_discogs_year(rows, token=DISCOGS_TOKEN, rate_sleep=1.1): | |
| out = [] | |
| for row in tqdm(rows, desc="Discogs enrich (year)"): | |
| title, artist = row["title"], row["artist"] | |
| try: | |
| data = discogs_search_release(title, artist, token) | |
| y = guess_year_from_discogs(data.get("results", []), title, artist) | |
| except Exception: | |
| y = None | |
| # respetar rate-limit aproximado 60/min | |
| time.sleep(rate_sleep) | |
| row2 = dict(row) | |
| row2["year"] = int(y) if isinstance(y, int) else y | |
| out.append(row2) | |
| return out | |
| # -------------------------- | |
| # Filtro final + deduplicación + etiquetado idioma | |
| # -------------------------- | |
| def dedup_rows(rows): | |
| # Deduplicar por (title, artist) con fuzzy | |
| rows_sorted = sorted(rows, key=lambda r: (r.get("title","").lower(), r.get("artist","").lower())) | |
| uniq = [] | |
| for r in rows_sorted: | |
| if not uniq: | |
| uniq.append(r); continue | |
| last = uniq[-1] | |
| if soft_equal(last["title"], r["title"]) and soft_equal(last["artist"], r["artist"]): | |
| # unir tags | |
| st = set((last.get("source_tag") or "").split("|")) | set((r.get("source_tag") or "").split("|")) | |
| last["source_tag"] = "|".join(sorted(t for t in st if t)) | |
| else: | |
| uniq.append(r) | |
| return uniq | |
| def annotate_language(rows): | |
| for r in rows: | |
| r["language"] = lang_of(r["title"], r["artist"]) | |
| return rows | |
| def filter_year_range(rows, year_min=YEAR_MIN, year_max=YEAR_MAX): | |
| keep = [] | |
| for r in rows: | |
| y = r.get("year") | |
| if isinstance(y, int) and (year_min <= y <= year_max): | |
| keep.append(r) | |
| # si no hay año, puedes optar por mantenerlos para posterior enriquecimiento | |
| return keep | |
| # -------------------------- | |
| # Main | |
| # -------------------------- | |
| def main(): | |
| print(">> Cosechando Last.fm por tags…") | |
| rows = harvest_lastfm() | |
| print(f"Last.fm rows (raw): {len(rows)}") | |
| print(">> Enriqueciendo con año vía Discogs…") | |
| rows2 = enrich_with_discogs_year(rows) | |
| print(">> Deduplicando…") | |
| rows3 = dedup_rows(rows2) | |
| print(">> Anotando idioma…") | |
| rows4 = annotate_language(rows3) | |
| print(">> Filtrando por año 1980–1999…") | |
| rows5 = filter_year_range(rows4, YEAR_MIN, YEAR_MAX) | |
| print(f"Total final (con año en rango): {len(rows5)}") | |
| # Guardar CSV | |
| df = pd.DataFrame(rows5, columns=["title","artist","year","language","source_tag"]) | |
| df = df.drop_duplicates() | |
| df.to_csv(OUTPUT_CSV, index=False) | |
| print(f">> Escrito {OUTPUT_CSV} con {len(df)} filas.") | |
| if __name__ == "__main__": | |
| main() | |