Spaces:
Sleeping
Sleeping
| # -*- coding: utf-8 -*- | |
| # Open Dataset Finder (HF / Zenodo / Kaggle) with Gradio MCP enabled | |
| import os, io, re, html, time, csv, subprocess, string, typing as T, json | |
| from dataclasses import dataclass | |
| from datetime import datetime | |
| import requests | |
| import pandas as pd | |
| from rapidfuzz import fuzz | |
| from rank_bm25 import BM25Okapi | |
| from huggingface_hub import list_datasets, HfApi | |
| import gradio as gr | |
| # -------------------- Common Utilities -------------------- | |
| def to_dt_str(x) -> str: | |
| """Safely convert datetime or string into YYYY-MM-DD.""" | |
| if not x: | |
| return "" | |
| if isinstance(x, datetime): | |
| return x.strftime("%Y-%m-%d") | |
| s = str(x) | |
| for fmt in ("%Y-%m-%d", "%Y-%m-%dT%H:%M:%S%z", "%Y-%m-%dT%H:%M:%S", "%Y/%m/%d", "%d/%m/%Y"): | |
| try: | |
| return datetime.strptime(s.replace("Z",""), fmt).strftime("%Y-%m-%d") | |
| except: | |
| pass | |
| return s[:10] | |
| def tokenize(s: str) -> T.List[str]: | |
| s = (s or "").lower() | |
| for ch in string.punctuation: | |
| s = s.replace(ch, " ") | |
| return [w for w in s.split() if w] | |
| # -------------------- Standard Schema -------------------- | |
| class Row: | |
| source: str | |
| id: str | |
| title: str | |
| description: str | |
| updated: str | |
| url: str | |
| download_url: str | |
| formats: T.List[str] | |
| # -------------------- Hugging Face (datasets) -------------------- | |
| def search_hf(q, limit=40): | |
| """Use list_datasets → optionally enrich with dataset_info.""" | |
| out = [] | |
| api = HfApi() | |
| try: | |
| ds_list = list_datasets(search=q, limit=limit) | |
| except Exception as e: | |
| print("HF list_datasets error:", e) | |
| return out | |
| for d in ds_list: | |
| ds_id = getattr(d, "id", None) or "" | |
| title = ds_id | |
| url = f"https://huggingface.co/datasets/{ds_id}" | |
| updated = to_dt_str(getattr(d, "lastModified", None) or getattr(d, "updated_at", None)) | |
| desc = "" | |
| fmts = [] | |
| try: | |
| info = api.dataset_info(ds_id, timeout=15) | |
| card = getattr(info, "cardData", None) or {} | |
| desc = (card.get("description") if isinstance(card, dict) else "") or "" | |
| updated = to_dt_str(getattr(info, "lastModified", None) or getattr(info, "updated_at", None)) or updated | |
| except Exception: | |
| pass | |
| out.append(Row("huggingface", ds_id, title, desc, updated, url, "", fmts)) | |
| return out | |
| # -------------------- Zenodo -------------------- | |
| SAFE_TIMEOUT=20 | |
| UA={"User-Agent":"OpenDatasetFinder/mini/0.2 (+HF Space)"} | |
| def safe_get(url, params=None, timeout=SAFE_TIMEOUT, retries=2): | |
| for i in range(retries+1): | |
| try: | |
| r = requests.get(url, params=params, headers=UA, timeout=timeout) | |
| r.raise_for_status() | |
| return r | |
| except Exception: | |
| if i==retries: | |
| raise | |
| time.sleep(1.2*(i+1)) | |
| def search_zenodo(q, limit=40): | |
| base="https://zenodo.org/api/records" | |
| r = safe_get(base, params={"q":q, "type":"dataset", "size":limit}) | |
| hits = r.json().get("hits",{}).get("hits",[]) | |
| out=[] | |
| for h in hits: | |
| md=h.get("metadata",{}) or {} | |
| title = md.get("title") or h.get("title") or "" | |
| desc = re.sub(r"<[^>]+>"," ", html.unescape(md.get("description") or "")).strip() | |
| url = (h.get("links",{}) or {}).get("html","") | |
| files = h.get("files") or [] | |
| fmts = list({(f.get("type") or f.get("mimetype") or "").split("/")[-1] for f in files if f}) | |
| dl = files[0].get("links",{}).get("self","") if files else "" | |
| upd = to_dt_str(h.get("updated")) | |
| out.append(Row("zenodo", str(h.get("id") or ""), title, desc, upd, url, dl, [f for f in fmts if f])) | |
| return out | |
| # -------------------- Kaggle (env creds auto) -------------------- | |
| def ensure_kaggle_credentials(): | |
| """If env vars exist, create ~/.kaggle/kaggle.json with correct permissions.""" | |
| path = os.path.expanduser("~/.kaggle/kaggle.json") | |
| if os.path.exists(path): | |
| return | |
| user = os.environ.get("KAGGLE_USERNAME") | |
| key = os.environ.get("KAGGLE_KEY") | |
| if not (user and key): | |
| return | |
| os.makedirs(os.path.dirname(path), exist_ok=True) | |
| with open(path, "w") as f: | |
| json.dump({"username": user, "key": key}, f) | |
| os.chmod(path, 0o600) | |
| def kaggle_available(): | |
| cred_path = os.path.expanduser("~/.kaggle/kaggle.json") | |
| return bool(os.environ.get("KAGGLE_USERNAME") and os.environ.get("KAGGLE_KEY")) or os.path.exists(cred_path) | |
| def search_kaggle(q, limit=40): | |
| """API first → fallback CLI if empty/failure.""" | |
| rows=[] | |
| try: | |
| ensure_kaggle_credentials() | |
| from kaggle.api.kaggle_api_extended import KaggleApi | |
| api=KaggleApi(); api.authenticate() | |
| try: | |
| api_res = api.dataset_list(search=q, page=1) | |
| except TypeError: | |
| api_res = [] | |
| if api_res: | |
| for d in api_res[:limit]: | |
| try: | |
| m = api.dataset_view(d.ref) | |
| desc=(getattr(m, "description", "") or "").strip() | |
| upd = to_dt_str(getattr(m, "lastUpdated", None)) | |
| except Exception: | |
| desc, upd = "", "" | |
| fmts=[] | |
| try: | |
| files=api.dataset_list_files(d.ref).files | |
| for f in files: | |
| ext=(f.name.split(".")[-1] if "." in f.name else "").lower() | |
| if ext: fmts.append(ext) | |
| fmts = sorted(set(fmts)) | |
| except Exception: | |
| pass | |
| url=f"https://www.kaggle.com/datasets/{d.ref}" | |
| rows.append(Row("kaggle", d.ref, d.title or d.ref, desc, upd, url, url, fmts)) | |
| return rows | |
| except Exception: | |
| pass | |
| try: | |
| cli = subprocess.run( | |
| ["kaggle", "datasets", "list", "-s", q, "--csv", "-p", "1", "-r", str(max(20, min(100, limit)))], | |
| capture_output=True, text=True | |
| ) | |
| if cli.returncode == 0 and cli.stdout.strip(): | |
| f = io.StringIO(cli.stdout) | |
| reader = csv.DictReader(f) | |
| for i, r in enumerate(reader): | |
| if i >= limit: | |
| break | |
| title = r.get("title") or "" | |
| url = r.get("url") or "" | |
| ref = "/".join(url.rstrip("/").split("/")[-2:]) if "/datasets/" in url else url | |
| rows.append(Row( | |
| "kaggle", | |
| ref, | |
| title, | |
| (r.get("subtitle") or "").strip(), | |
| (r.get("lastUpdated") or "")[:10], | |
| url, | |
| url, | |
| [] | |
| )) | |
| except Exception: | |
| pass | |
| return rows | |
| # -------------------- Ranking -------------------- | |
| def rank(q: str, rows: T.List[Row]): | |
| if not rows: | |
| return pd.DataFrame(columns=["source","id","title","description","updated","url","download_url","formats","score"]) | |
| docs=[tokenize(r.title+" "+r.description) for r in rows] | |
| bm25=BM25Okapi(docs) | |
| qtok=tokenize(q) | |
| bm=bm25.get_scores(qtok) | |
| mx=max(bm) if len(bm)>0 else 1.0 | |
| scored=[] | |
| for i,r in enumerate(rows): | |
| fz=fuzz.token_set_ratio(q, r.title+" "+r.description)/100.0 | |
| rec=0.0 | |
| try: | |
| if r.updated: | |
| days=(datetime.utcnow()-datetime.strptime(r.updated,"%Y-%m-%d")).days | |
| rec=max(0.0, 1.0-min(days,365)/365.0) | |
| except: | |
| pass | |
| score=0.6*(bm[i]/(mx+1e-9))+0.35*fz+0.05*rec | |
| scored.append([r.source,r.id,r.title,r.description[:500],r.updated,r.url,r.download_url,", ".join(r.formats), round(float(score),4)]) | |
| df=pd.DataFrame(scored, columns=["source","id","title","description","updated","url","download_url","formats","score"]) | |
| return df.sort_values("score", ascending=False).reset_index(drop=True) | |
| # -------------------- Gradio UI -------------------- | |
| with gr.Blocks(title="Open Dataset Finder (HF • Zenodo • Kaggle)") as demo: | |
| gr.Markdown( | |
| """ | |
| # 🔍 Open Dataset Finder | |
| This app lets you search datasets from multiple open data sources. | |
| - **Hugging Face Datasets**: Public machine learning datasets for NLP, computer vision, speech, and more. | |
| - **Zenodo**: Research datasets shared by scientists and institutions, often linked to academic publications. | |
| - **Kaggle**: Community datasets, competition datasets, and practice datasets shared on Kaggle. | |
| ### Kaggle authentication | |
| To enable Kaggle search, you need to add your Kaggle API credentials as **Repository secrets** in the Space settings: | |
| - `KAGGLE_USERNAME`: your Kaggle username | |
| - `KAGGLE_KEY`: your Kaggle API token (found in the `kaggle.json` file you can download from your Kaggle account) | |
| Once the secrets are set, you can check the Kaggle box in the UI and search Kaggle datasets directly here. | |
| ### Source repository | |
| - GitHub: https://github.com/hyeonseo2/dataset-search-mcp | |
| """ | |
| ) | |
| with gr.Row(): | |
| q = gr.Textbox(label="Query / Idea", value="korean weather") | |
| k = gr.Slider(10, 200, value=40, step=10, label="Results per source") | |
| with gr.Row(): | |
| use_hf = gr.Checkbox(value=True, label="Hugging Face") | |
| use_zen = gr.Checkbox(value=True, label="Zenodo") | |
| use_kg = gr.Checkbox(value=False, label="Kaggle") | |
| btn = gr.Button("Search", variant="primary") | |
| out = gr.Dataframe(wrap=True) | |
| log = gr.Textbox(label="Logs", lines=8) | |
| def do_search(q_, k_, u_hf, u_zen, u_kg): | |
| logs=[] | |
| rows=[] | |
| try: | |
| if u_hf: | |
| logs.append("Searching Hugging Face…") | |
| rows+=search_hf(q_, int(k_)) | |
| except Exception as e: | |
| logs.append(f"HF error: {e}") | |
| try: | |
| if u_zen: | |
| logs.append("Searching Zenodo…") | |
| rows+=search_zenodo(q_, int(k_)) | |
| except Exception as e: | |
| logs.append(f"Zenodo error: {e}") | |
| if u_kg: | |
| if kaggle_available(): | |
| try: | |
| logs.append("Searching Kaggle…") | |
| rows+=search_kaggle(q_, int(k_)) | |
| except Exception as e: | |
| logs.append(f"Kaggle error: {e}") | |
| else: | |
| logs.append("No Kaggle credentials found → skipped") | |
| df=rank(q_, rows) | |
| logs.append(f"Total {len(df)} results") | |
| return df, "\n".join(logs) | |
| btn.click(do_search, inputs=[q,k,use_hf,use_zen,use_kg], outputs=[out, log]) | |
| # -------------------- Run (Gradio + MCP) -------------------- | |
| if __name__ == "__main__": | |
| demo.queue().launch( | |
| server_name="0.0.0.0", | |
| server_port=7860, | |
| show_error=True, | |
| debug=False, | |
| mcp_server=True, | |
| ) | |