"""Dataset Lineage Explorer — FastAPI + SQLite (read-only) + vis-network. - Overview: analysis dashboard (relationship-type mix, most-copied, most-derived, most-translated, most-prolific producers) over the inferred lineage graph. - Explorer: search/pick a dataset -> interactive lineage graph + related lists. - API: GET /related?dataset=, /api/graph, /analysis, /examples, /stats. DB is opened read-only; lineage.db is baked into the repo (small, robust). Edward Tufte-inspired styling. """ import json import sqlite3 from functools import lru_cache from pathlib import Path from fastapi import FastAPI, Query, Request from fastapi.middleware.gzip import GZipMiddleware from fastapi.responses import HTMLResponse, JSONResponse # Prefer the bucket-mounted DB (/data/lineage.db, persistent + can grow); # fall back to the baked-in copy if the mount isn't present (cold-mount fail, # local dev, etc.) so the Space stays up either way. BUCKET_DB = Path("/data/lineage.db") LOCAL_DB = Path(__file__).parent / "lineage.db" DB = BUCKET_DB if BUCKET_DB.exists() else LOCAL_DB print(f"using DB: {DB} (bucket-mount={BUCKET_DB.exists()})", flush=True) # High safety ceiling only — stops a depth-2 hub exploding to tens of thousands # of nodes. Normal hubs (incl. FLAN's ~1.4k children) render fully. MAX_NODES = 2500 app = FastAPI(title="Dataset Lineage Explorer") app.add_middleware(GZipMiddleware, minimum_size=500) _con = sqlite3.connect(f"file:{DB}?mode=ro", uri=True, check_same_thread=False) _con.row_factory = sqlite3.Row @app.middleware("http") async def cache_headers(request: Request, call_next): resp = await call_next(request) if request.url.path.startswith(("/api", "/related", "/examples", "/stats", "/analysis")): resp.headers["Cache-Control"] = "public, max-age=86400" return resp @lru_cache(maxsize=None) def _meta(dataset: str) -> dict: r = _con.execute("SELECT * FROM nodes WHERE dataset_id=?", (dataset,)).fetchone() return dict(r) if r else {"dataset_id": dataset, "downloads": 0, "likes": 0, "author": ""} @lru_cache(maxsize=None) def parents(dataset: str) -> list[dict]: rows = _con.execute( "SELECT parent AS id, primary_type, confidence, tags, size_ratio, source " "FROM edges WHERE child=? ORDER BY confidence DESC", (dataset,) ).fetchall() return [dict(r) for r in rows] @lru_cache(maxsize=None) def children(dataset: str) -> list[dict]: rows = _con.execute( "SELECT child AS id, primary_type, confidence, tags, size_ratio, source " "FROM edges WHERE parent=? ORDER BY confidence DESC", (dataset,) ).fetchall() return [dict(r) for r in rows] def siblings(dataset: str) -> list[dict]: rows = _con.execute( "SELECT DISTINCT e2.child AS id, e2.parent AS via, e2.primary_type " "FROM edges e1 JOIN edges e2 ON e1.parent=e2.parent " "WHERE e1.child=? AND e2.child!=? LIMIT 80", (dataset, dataset) ).fetchall() return [dict(r) for r in rows] def related(dataset: str) -> dict: return {"dataset": dataset, "meta": _meta(dataset), "parents": parents(dataset), "children": children(dataset), "siblings": siblings(dataset)} @app.get("/related") def related_endpoint(dataset: str = Query(..., description="dataset id, e.g. tatsu-lab/alpaca")): """JSON of parents/children/siblings — for metadata workflows.""" return JSONResponse(related(dataset)) @app.get("/api/graph") def graph(dataset: str, depth: int = 2): seen_edges, nodes = [], set() frontier_up, frontier_down = {dataset}, {dataset} nodes.add(dataset) for _ in range(max(1, min(depth, 4))): nxt = set() for d in frontier_up: for p in parents(d): seen_edges.append((p["id"], d, p["primary_type"], p["confidence"], p["source"])) if p["id"] not in nodes and len(nodes) < MAX_NODES: nodes.add(p["id"]) nxt.add(p["id"]) frontier_up = nxt if len(nodes) >= MAX_NODES: break for _ in range(max(1, min(depth, 4))): nxt = set() for d in frontier_down: for c in children(d): seen_edges.append((d, c["id"], c["primary_type"], c["confidence"], c["source"])) if c["id"] not in nodes and len(nodes) < MAX_NODES: nodes.add(c["id"]) nxt.add(c["id"]) frontier_down = nxt if len(nodes) >= MAX_NODES: break edges_seen = {(a, b): (t, cf, s) for a, b, t, cf, s in seen_edges} node_list = [{"id": n, "label": n.split("/")[-1], "title": f"{n} ({_meta(n)['downloads']:,} dl)", "downloads": _meta(n)["downloads"], "focus": n == dataset} for n in nodes] edge_list = [{"from": a, "to": b, "type": t, "confidence": cf, "source": s} for (a, b), (t, cf, s) in edges_seen.items() if a in nodes and b in nodes] return {"nodes": node_list, "edges": edge_list} @lru_cache(maxsize=1) def _stats(): n = _con.execute("SELECT count(*) FROM nodes").fetchone()[0] e = _con.execute("SELECT count(*) FROM edges").fetchone()[0] inf = _con.execute("SELECT count(*) FROM edges WHERE source='inferred'").fetchone()[0] deriv = _con.execute("SELECT count(DISTINCT child) FROM edges WHERE source='inferred'").fetchone()[0] return {"nodes": n, "edges": e, "inferred": inf, "derivatives": deriv} @lru_cache(maxsize=8) def _examples(n: int): rows = _con.execute( "SELECT parent AS id, count(*) AS derivatives FROM edges " "WHERE source='inferred' GROUP BY parent ORDER BY derivatives DESC LIMIT ?", (n,) ).fetchall() return [dict(r) for r in rows] @lru_cache(maxsize=1) def _analysis(): def top(sql, *a): return [dict(r) for r in _con.execute(sql, a).fetchall()] return { "type_dist": top("SELECT primary_type t, count(*) n FROM edges WHERE source='inferred' " "GROUP BY t ORDER BY n DESC"), "most_copied": top("SELECT parent id, count(*) n FROM edges WHERE primary_type='exact_copy' " "GROUP BY parent ORDER BY n DESC LIMIT 12"), "most_derived": top("SELECT parent id, count(*) n FROM edges WHERE source='inferred' " "GROUP BY parent ORDER BY n DESC LIMIT 12"), "most_translated": top("SELECT parent id, count(*) n FROM edges WHERE primary_type='translation' " "GROUP BY parent ORDER BY n DESC LIMIT 10"), "top_orgs": top("SELECT substr(child,1,instr(child,'/')-1) id, count(*) n FROM edges " "WHERE source='inferred' AND instr(child,'/')>0 GROUP BY id ORDER BY n DESC LIMIT 12"), } @lru_cache(maxsize=1) def _map(): """Overview map: multi-level chains (a derivative that is itself derived-from) + the top fan-out hubs. Nodes sized by derivative count.""" counts = dict(_con.execute( "SELECT parent, count(*) FROM edges WHERE source='inferred' GROUP BY parent" ).fetchall()) backbone = _con.execute( "SELECT parent, child, primary_type, confidence FROM edges WHERE source='inferred' " "AND child IN (SELECT DISTINCT parent FROM edges WHERE source='inferred')" ).fetchall() nodeset, edges = set(), [] for r in backbone: nodeset.add(r["parent"]) nodeset.add(r["child"]) edges.append({"from": r["parent"], "to": r["child"], "type": r["primary_type"], "confidence": r["confidence"]}) for pid, _n in sorted(counts.items(), key=lambda x: -x[1])[:60]: nodeset.add(pid) nodes = [{"id": n, "label": n.split("/")[-1], "derivatives": counts.get(n, 0), "downloads": _meta(n)["downloads"]} for n in nodeset] return {"nodes": nodes, "edges": edges} @app.get("/api/map") def api_map(): return _map() @app.get("/stats") def stats(): return _stats() @app.get("/examples") def examples(n: int = 24): return _examples(n) @app.get("/analysis") def analysis(): """Aggregate analysis of the inferred lineage graph.""" return _analysis() @app.get("/", response_class=HTMLResponse) def index(): return (INDEX_HTML .replace("__STATS__", json.dumps(_stats())) .replace("__EXAMPLES__", json.dumps(_examples(24))) .replace("__ANALYSIS__", json.dumps(_analysis()))) INDEX_HTML = """ Dataset Lineage Explorer

Dataset Lineage Explorer

""" if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=7860)