""" Concept Atlas — Embedding Generator ===================================== Generates sentence embeddings for each focus concept (mensch, verhalten, evolution) and pushes embeddings + metadata to a HuggingFace public dataset. Space: https://huggingface.co/spaces/deirdosh/curriculum_analysis_german Dataset: https://huggingface.co/datasets/deirdosh/curriculum_embeddings """ # ── stdlib ─────────────────────────────────────────────────────────────────── import os, sys, json, hashlib, warnings, logging, traceback, time, threading import shutil import urllib.request from pathlib import Path from datetime import datetime warnings.filterwarnings("ignore") LOG_FILE = Path("pipeline.log") logging.basicConfig( level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s", handlers=[ logging.StreamHandler(sys.stdout), logging.FileHandler(str(LOG_FILE), mode="a", encoding="utf-8"), ], ) logger = logging.getLogger(__name__) # ── third-party ────────────────────────────────────────────────────────────── import numpy as np import pandas as pd import gradio as gr # ── lazy heavy imports ──────────────────────────────────────────────────────── _ST_MODEL = None # ── directories ────────────────────────────────────────────────────────────── CACHE_DIR = Path("cache") DATA_DIR = Path("data") for _d in (CACHE_DIR, DATA_DIR): _d.mkdir(parents=True, exist_ok=True) # ── constants ───────────────────────────────────────────────────────────────── FOCUS_CONCEPTS = ["mensch", "verhalten", "evolution"] MODEL_NAME = "paraphrase-multilingual-mpnet-base-v2" # Source corpus CSV (this Space's own file) CSV_URL = ( "https://huggingface.co/spaces/deirdosh/curriculum_analysis_german" "/resolve/main/data/curriculum_excerpts.csv" ) # Target HuggingFace dataset repository HF_DATASET_REPO = "deirdosh/curriculum_embeddings" # ── pipeline state ───────────────────────────────────────────────────────── _P: dict = {} _PIPELINE_RUNNING = threading.Event() _PIPELINE_THREAD: threading.Thread | None = None # Live log ring-buffer for the UI ticker _LOG_LINES: list[str] = [] _LOG_LOCK = threading.Lock() _MAX_LOG = 400 class _UILogHandler(logging.Handler): def emit(self, record: logging.LogRecord) -> None: with _LOG_LOCK: _LOG_LINES.append(self.format(record)) if len(_LOG_LINES) > _MAX_LOG: _LOG_LINES.pop(0) _ui_handler = _UILogHandler() _ui_handler.setFormatter( logging.Formatter("%(asctime)s %(message)s", datefmt="%H:%M:%S")) logging.getLogger(__name__).addHandler(_ui_handler) def get_live_log() -> str: with _LOG_LOCK: lines = list(_LOG_LINES) return "\n".join(lines[-100:]) # ============================================================================= # ATOMIC CACHE HELPERS # ============================================================================= def _ckey(logical: str) -> str: return hashlib.md5(logical.encode()).hexdigest()[:10] def _npy_path(key: str) -> Path: safe = "".join(c if c.isalnum() or c in "-_" else "_" for c in key)[:60] return CACHE_DIR / f"{safe}__{_ckey(key)}.npy" def _json_path(key: str) -> Path: safe = "".join(c if c.isalnum() or c in "-_" else "_" for c in key)[:60] return CACHE_DIR / f"{safe}__{_ckey(key)}.json" def _parquet_path(key: str) -> Path: safe = "".join(c if c.isalnum() or c in "-_" else "_" for c in key)[:60] return CACHE_DIR / f"{safe}__{_ckey(key)}.parquet" def _save_npy(arr: np.ndarray, key: str) -> Path: dest = _npy_path(key) tmp = dest.with_name(dest.stem + "__tmp.npy") try: np.save(tmp, arr) # numpy writes exactly tmp (ends in .npy) shutil.move(str(tmp), str(dest)) logger.info(f" [cache ✓] {dest.name} shape={arr.shape} dtype={arr.dtype}") return dest except Exception as exc: tmp.unlink(missing_ok=True) raise RuntimeError(f"_save_npy failed '{key}': {exc}") from exc def _load_npy(key: str, expected_rows: int | None = None) -> np.ndarray | None: p = _npy_path(key) if not p.exists(): return None try: arr = np.load(p, allow_pickle=False) if arr.size == 0: raise ValueError("empty array") if expected_rows is not None and arr.shape[0] != expected_rows: raise ValueError(f"rows {arr.shape[0]} ≠ {expected_rows}") logger.info(f" [cache ↑] {p.name} shape={arr.shape}") return arr except Exception as exc: logger.warning(f" [cache ✗] {p.name}: {exc} — deleting") p.unlink(missing_ok=True) return None def _save_json(obj, key: str) -> Path: p = _json_path(key) tmp = p.with_name(p.stem + "__tmp.json") try: tmp.write_text( json.dumps(obj, ensure_ascii=False, indent=2), encoding="utf-8") shutil.move(str(tmp), str(p)) logger.info(f" [cache ✓] {p.name}") return p except Exception as exc: tmp.unlink(missing_ok=True) raise RuntimeError(f"_save_json failed '{key}': {exc}") from exc def _load_json(key: str): p = _json_path(key) if not p.exists(): return None try: obj = json.loads(p.read_text(encoding="utf-8")) logger.info(f" [cache ↑] {p.name}") return obj except Exception as exc: logger.warning(f" [cache ✗] {p.name}: {exc} — deleting") p.unlink(missing_ok=True) return None def _save_parquet(df: pd.DataFrame, key: str) -> Path: dest = _parquet_path(key) tmp = dest.with_name(dest.stem + "__tmp.parquet") try: df.to_parquet(tmp, index=False) shutil.move(str(tmp), str(dest)) logger.info(f" [cache ✓] {dest.name} rows={len(df)}") return dest except Exception as exc: tmp.unlink(missing_ok=True) raise RuntimeError(f"_save_parquet failed '{key}': {exc}") from exc def _text_fingerprint(texts: list[str]) -> str: s = ((texts[0] if texts else "") + (texts[-1] if len(texts) > 1 else "") + str(len(texts))) return hashlib.md5(s.encode()).hexdigest()[:8] # ============================================================================= # DATA LOADING # ============================================================================= def _load_csv() -> pd.DataFrame: local = DATA_DIR / "curriculum_excerpts.csv" if not local.exists(): logger.info("Downloading corpus CSV from HuggingFace …") for attempt in range(4): try: urllib.request.urlretrieve(CSV_URL, local) logger.info(f" Downloaded → {local}") break except Exception as exc: logger.warning(f" Attempt {attempt+1} failed: {exc}") time.sleep(3 * (attempt + 1)) else: raise RuntimeError("Could not download CSV after 4 attempts.") df = pd.read_csv(local, dtype=str).fillna("") df.columns = [c.strip().lower().replace(" ", "_") for c in df.columns] for req in ("search_term", "text_excerpt"): if req not in df.columns: raise ValueError( f"CSV missing required column '{req}'. " f"Found: {list(df.columns)}") for opt in ("state", "subject", "grade", "school_type", "year", "file"): if opt not in df.columns: df[opt] = "" df["search_term_lower"] = df["search_term"].str.lower().str.strip() df["text_excerpt"] = df["text_excerpt"].str.strip() df = df[df["text_excerpt"].str.len() > 20].reset_index(drop=True) logger.info(f"CSV loaded: {len(df):,} rows | columns: {list(df.columns)}") return df def _filter_concept(df: pd.DataFrame, concept: str) -> pd.DataFrame: sub = df[df["search_term_lower"] == concept].reset_index(drop=True) if len(sub) < 5: sub = df[df["search_term_lower"].str.contains( concept, na=False)].reset_index(drop=True) logger.info(f" [{concept}] {len(sub):,} rows after filtering") return sub # ============================================================================= # SENTENCE-TRANSFORMER # ============================================================================= def _get_model(): global _ST_MODEL if _ST_MODEL is None: logger.info(f"Loading SentenceTransformer '{MODEL_NAME}' …") from sentence_transformers import SentenceTransformer _ST_MODEL = SentenceTransformer(MODEL_NAME) logger.info(" Model ready.") return _ST_MODEL def compute_embeddings(texts: list[str], concept: str) -> np.ndarray: """ Encode texts with L2-normalised embeddings. Cached by concept + n_texts + text fingerprint so a content change automatically invalidates the cache. """ fp = _text_fingerprint(texts) key = f"emb_{concept}_{len(texts)}_{fp}" hit = _load_npy(key, expected_rows=len(texts)) if hit is not None: logger.info(f" [{concept}] embeddings loaded from cache") return hit logger.info(f" [{concept}] encoding {len(texts):,} texts …") model = _get_model() arr = model.encode( texts, batch_size=32, show_progress_bar=True, convert_to_numpy=True, normalize_embeddings=True, ).astype(np.float32) _save_npy(arr, key) return arr # ============================================================================= # BUILD PER-CONCEPT ARTEFACTS # ============================================================================= def _build_concept_artefacts( sub: pd.DataFrame, embs: np.ndarray, concept: str, ) -> dict[str, Path]: """ Produce three files per concept and return a dict of {role: path}: embeddings.npy — float32 array (N, 768) metadata.parquet — one row per excerpt with all CSV columns + row_id metadata.json — same but as JSON for easy inspection """ n = len(sub) fp = _text_fingerprint(sub["text_excerpt"].tolist()) # ── embeddings ──────────────────────────────────────────────────────────── emb_key = f"emb_{concept}_{n}_{fp}" emb_path = _npy_path(emb_key) # already saved by compute_embeddings # ── metadata parquet ────────────────────────────────────────────────────── meta_key = f"meta_{concept}_{n}_{fp}" meta_path = _parquet_path(meta_key) if not meta_path.exists(): meta_df = sub.copy() meta_df.insert(0, "row_id", range(n)) meta_df.insert(1, "concept", concept) meta_df["embedding_dim"] = embs.shape[1] meta_df["n_texts"] = n meta_df["model"] = MODEL_NAME meta_df["created_at"] = datetime.now().isoformat(timespec="seconds") _save_parquet(meta_df, meta_key) else: logger.info(f" [{concept}] metadata parquet already cached") # ── metadata JSON (first 5 rows + schema) ───────────────────────────────── json_key = f"meta_preview_{concept}_{n}_{fp}" json_path = _json_path(json_key) if not json_path.exists(): preview = { "concept": concept, "n_texts": n, "embedding_dim": int(embs.shape[1]), "model": MODEL_NAME, "created_at": datetime.now().isoformat(timespec="seconds"), "columns": list(sub.columns), "preview_rows": sub.head(5).to_dict(orient="records"), } _save_json(preview, json_key) else: logger.info(f" [{concept}] metadata JSON already cached") return { "embeddings": emb_path, "metadata_parquet": meta_path, "metadata_json": json_path, } # ============================================================================= # HUGGINGFACE DATASET PUSH # ============================================================================= def _push_concept_to_hf( concept: str, paths: dict[str, Path], token: str, ) -> str: """ Upload a single concept's artefacts to the HF dataset repo. Remote layout: {concept}/embeddings.npy {concept}/metadata.parquet {concept}/metadata_preview.json """ try: from huggingface_hub import HfApi api = HfApi(token=token) # Ensure the dataset repo exists (creates if needed) try: api.repo_info(repo_id=HF_DATASET_REPO, repo_type="dataset") except Exception: logger.info(f" Creating dataset repo '{HF_DATASET_REPO}' …") api.create_repo( repo_id=HF_DATASET_REPO, repo_type="dataset", private=False, exist_ok=True, ) uploads = [ (paths["embeddings"], f"{concept}/embeddings.npy"), (paths["metadata_parquet"], f"{concept}/metadata.parquet"), (paths["metadata_json"], f"{concept}/metadata_preview.json"), ] for local_path, remote_path in uploads: if not local_path.exists(): logger.warning(f" Skipping missing file: {local_path}") continue logger.info(f" Uploading {local_path.name} → {remote_path} …") api.upload_file( path_or_fileobj=str(local_path), path_in_repo=remote_path, repo_id=HF_DATASET_REPO, repo_type="dataset", commit_message=( f"[{concept}] update embeddings " f"{datetime.now().isoformat(timespec='minutes')}" ), ) logger.info(f" ✓ {remote_path}") return f"✓ [{concept}] pushed to {HF_DATASET_REPO}" except Exception as exc: msg = f"✗ [{concept}] HF push failed: {exc}\n{traceback.format_exc()}" logger.error(msg) return msg def _push_dataset_card(token: str, summary: dict) -> None: """Write/update a README.md dataset card on HF.""" try: from huggingface_hub import HfApi api = HfApi(token=token) lines = [ "---", "license: cc-by-4.0", "language:", "- de", "tags:", "- embeddings", "- curriculum", "- education", "- german", "- sentence-transformers", "---", "", "# German Curriculum Concept Embeddings", "", "Sentence embeddings for three focus concepts from the German school " "curriculum analysis project.", "", f"**Model:** `{MODEL_NAME}` ", f"**Generated:** {datetime.now().isoformat(timespec='seconds')} ", "", "## Structure", "", "```", "concept/", " embeddings.npy # float32 (N, 768) L2-normalised", " metadata.parquet # one row per excerpt, all CSV columns", " metadata_preview.json # schema + first 5 rows", "```", "", "## Concepts", "", ] for concept, info in summary.items(): lines.append( f"### `{concept}` " f"— {info['n']:,} excerpts · dim={info['dim']}" ) lines += [ "", "## Usage", "", "```python", "import numpy as np", "import pandas as pd", "from huggingface_hub import hf_hub_download", "", "concept = 'evolution'", "", "emb_path = hf_hub_download(", f' repo_id="{HF_DATASET_REPO}",', ' repo_type="dataset",', ' filename=f"{concept}/embeddings.npy")', "", "meta_path = hf_hub_download(", f' repo_id="{HF_DATASET_REPO}",', ' repo_type="dataset",', ' filename=f"{concept}/metadata.parquet")', "", "embs = np.load(emb_path) # (N, 768)", "meta = pd.read_parquet(meta_path) # N rows", "```", "", "## Source", "", "Generated by the " "[Concept Atlas Space]" "(https://huggingface.co/spaces/deirdosh/curriculum_analysis_german).", ] readme = "\n".join(lines) api.upload_file( path_or_fileobj=readme.encode("utf-8"), path_in_repo="README.md", repo_id=HF_DATASET_REPO, repo_type="dataset", commit_message="update dataset card", ) logger.info(" Dataset card (README.md) updated.") except Exception as exc: logger.warning(f" Dataset card update failed: {exc}") # ============================================================================= # PIPELINE STATE CACHE (lightweight JSON) # ============================================================================= _META_KEY = "embedding_pipeline_meta_v1" def _save_state() -> None: state = { "timestamp": _P.get("timestamp", ""), "concepts_done": _P.get("concepts_done", []), "concept_meta": _P.get("concept_meta", {}), "hf_status": _P.get("hf_status", {}), } _save_json(state, _META_KEY) def _load_state() -> bool: state = _load_json(_META_KEY) if not state: return False _P.update({ "timestamp": state.get("timestamp", ""), "concepts_done": state.get("concepts_done", []), "concept_meta": state.get("concept_meta", {}), "hf_status": state.get("hf_status", {}), }) logger.info(f" Prior run restored: {_P['concepts_done']} [{_P['timestamp']}]") return True # ============================================================================= # PIPELINE WORKER # ============================================================================= def _pipeline_worker(token: str) -> None: try: # ── Load corpus ─────────────────────────────────────────────────────── logger.info("━" * 56) logger.info("STEP 1/4 Loading corpus CSV") logger.info("━" * 56) df = _load_csv() _P["df"] = df concepts_done = list(_P.get("concepts_done", [])) concept_meta = dict(_P.get("concept_meta", {})) hf_status = dict(_P.get("hf_status", {})) # ── Per-concept embedding ───────────────────────────────────────────── logger.info("━" * 56) logger.info("STEP 2/4 Computing embeddings per concept") logger.info("━" * 56) for concept in FOCUS_CONCEPTS: logger.info(f"\n[{concept.upper()}] ── filtering …") sub = _filter_concept(df, concept) n = len(sub) if n < 5: logger.warning(f" [{concept}] only {n} rows — skipping") continue texts = sub["text_excerpt"].tolist() logger.info(f"[{concept.upper()}] ── embeddings ({n:,} texts) …") embs = compute_embeddings(texts, concept) logger.info(f"[{concept.upper()}] ── building artefact files …") paths = _build_concept_artefacts(sub, embs, concept) # store in-memory for status display concept_meta[concept] = { "n": n, "dim": int(embs.shape[1]), "fp": _text_fingerprint(texts), "emb_path": str(paths["embeddings"]), "meta_path": str(paths["metadata_parquet"]), } if concept not in concepts_done: concepts_done.append(concept) _P.update(dict( concepts_done=concepts_done, concept_meta=concept_meta, )) _save_state() # checkpoint after each concept # ── Push to HF ──────────────────────────────────────────────────────── logger.info("━" * 56) logger.info("STEP 3/4 Pushing to HuggingFace dataset") logger.info("━" * 56) if not token: logger.warning( " HF_TOKEN not provided — skipping upload.\n" " Set HF_TOKEN as a Space secret or pass it in the UI.") else: for concept in concepts_done: logger.info(f"\n[{concept.upper()}] ── uploading …") meta = concept_meta[concept] paths_for_push = { "embeddings": Path(meta["emb_path"]), "metadata_parquet": Path(meta["meta_path"]), "metadata_json": _json_path( f"meta_preview_{concept}_{meta['n']}_{meta['fp']}"), } result = _push_concept_to_hf(concept, paths_for_push, token) hf_status[concept] = result logger.info(f" {result}") # dataset card logger.info("\nUpdating dataset README …") _push_dataset_card(token, concept_meta) _P.update(dict(hf_status=hf_status)) # ── Finalise ────────────────────────────────────────────────────────── logger.info("━" * 56) logger.info("STEP 4/4 Saving final state") logger.info("━" * 56) _P["timestamp"] = datetime.now().isoformat(timespec="seconds") _save_state() logger.info("\n" + "═" * 56) logger.info("✓ PIPELINE COMPLETE") logger.info(f" Timestamp : {_P['timestamp']}") for concept in concepts_done: m = concept_meta[concept] logger.info( f" {concept:12s}: {m['n']:,} texts · " f"dim={m['dim']} · " f"HF={hf_status.get(concept, 'not pushed')[:40]}") logger.info("═" * 56) except Exception: logger.error(f"Pipeline error:\n{traceback.format_exc()}") finally: _PIPELINE_RUNNING.clear() # ============================================================================= # PUBLIC API (called by Gradio buttons) # ============================================================================= def launch_pipeline(hf_token: str) -> None: """Start the embedding pipeline in a background thread.""" global _PIPELINE_THREAD if _PIPELINE_RUNNING.is_set(): logger.info("Pipeline already running — wait for it to finish.") return _PIPELINE_RUNNING.set() logger.info("⏳ Pipeline launched …") token = hf_token.strip() or os.environ.get("HF_TOKEN", "") _PIPELINE_THREAD = threading.Thread( target=_pipeline_worker, args=(token,), name="pipeline", daemon=True, ) _PIPELINE_THREAD.start() def get_status_md() -> str: """Markdown summary card shown in the UI.""" if not _P.get("concepts_done"): return "_No pipeline run yet — click **▶ Run** to start._" lines = [ f"**Last run:** {_P.get('timestamp','—')}", "", "| Concept | N texts | Dim | Local cache | HF status |", "|---|---|---|---|---|", ] for concept in FOCUS_CONCEPTS: meta = _P.get("concept_meta", {}).get(concept, {}) n = f"{meta.get('n', 0):,}" if meta else "—" dim = str(meta.get("dim", "—")) cached = "✓" if meta.get("emb_path") and Path(meta["emb_path"]).exists() else "—" hfs = _P.get("hf_status", {}).get(concept, "—")[:50] lines.append(f"| {concept.capitalize()} | {n} | {dim} | {cached} | {hfs} |") # dataset link lines += [ "", f"**Dataset:** [{HF_DATASET_REPO}]" f"(https://huggingface.co/datasets/{HF_DATASET_REPO})", ] return "\n".join(lines) def get_cache_inventory() -> str: """List all cached files.""" files = sorted(CACHE_DIR.glob("*")) + sorted(DATA_DIR.glob("*")) if not files: return "_Cache is empty._" total = sum(f.stat().st_size for f in files if f.is_file()) lines = [f"### {len(files)} cached files ({total/1_048_576:.2f} MB total)", ""] for f in files: if f.is_file(): sz = f.stat().st_size / 1024 ts = datetime.fromtimestamp(f.stat().st_mtime).strftime("%m-%d %H:%M") lines.append(f"- `{f.name}` — {sz:.1f} KB · {ts}") return "\n".join(lines) def download_concept_embeddings(concept: str) -> str | None: """Return path to the concept's .npy embedding file for gr.File.""" meta = _P.get("concept_meta", {}).get(concept, {}) p = Path(meta.get("emb_path", "")) if meta else Path("") if p.exists(): return str(p) # try to find by glob candidates = sorted(CACHE_DIR.glob(f"emb_{concept}_*__*.npy"), key=lambda x: x.stat().st_mtime, reverse=True) return str(candidates[0]) if candidates else None def download_concept_metadata(concept: str) -> str | None: """Return path to the concept's .parquet metadata file for gr.File.""" meta = _P.get("concept_meta", {}).get(concept, {}) p = Path(meta.get("meta_path", "")) if meta else Path("") if p.exists(): return str(p) candidates = sorted(CACHE_DIR.glob(f"meta_{concept}_*__*.parquet"), key=lambda x: x.stat().st_mtime, reverse=True) return str(candidates[0]) if candidates else None def download_all_zip() -> str | None: """Bundle all concept artefacts into a single ZIP.""" import zipfile files: list[Path] = [] for concept in FOCUS_CONCEPTS: emb = download_concept_embeddings(concept) mta = download_concept_metadata(concept) if emb: files.append(Path(emb)) if mta: files.append(Path(mta)) # JSON preview meta = _P.get("concept_meta", {}).get(concept, {}) if meta: jp = _json_path( f"meta_preview_{concept}_{meta.get('n',0)}_{meta.get('fp','')}") if jp.exists(): files.append(jp) if LOG_FILE.exists(): files.append(LOG_FILE) if not files: return None out = CACHE_DIR / f"curriculum_embeddings_{datetime.now().strftime('%Y%m%d_%H%M%S')}.zip" with zipfile.ZipFile(out, "w", zipfile.ZIP_DEFLATED) as zf: for p in files: zf.write(p, p.name) logger.info(f" ZIP → {out.name} {out.stat().st_size//1024} KB") return str(out) # ============================================================================= # GRADIO UI # ============================================================================= _HDR = """

🔭 Concept Atlas — Embedding Generator

Generates sentence embeddings for mensch · verhalten · evolution and pushes them to a public HuggingFace dataset.

Model: paraphrase-multilingual-mpnet-base-v2  |  Dataset: deirdosh/curriculum_embeddings

""" _INSTRUCTIONS = """ ### What this app does 1. Downloads the German curriculum corpus CSV (~35k excerpts) 2. Filters excerpts for each of the three focus concepts 3. Encodes every excerpt with `paraphrase-multilingual-mpnet-base-v2` (768-dim, L2-normalised, float32) 4. Saves per-concept artefacts: - `embeddings.npy` — shape `(N, 768)` - `metadata.parquet` — all CSV columns + `row_id`, `concept`, `model`, timestamps - `metadata_preview.json` — schema + first 5 rows 5. Pushes all artefacts to `huggingface.co/datasets/deirdosh/curriculum_embeddings` ### How to use | Step | Action | |---|---| | 1 | Paste your `HF_TOKEN` (needs *write* access to the dataset repo) | | 2 | Click **▶ Run Pipeline** | | 3 | Watch the live log — each concept takes ~10 min on CPU | | 4 | Download individual files or the full ZIP below | > All steps are individually cached. Re-running skips already-computed embeddings. > The HF_TOKEN can also be set as a **Space secret** — leave the field blank if so. """ _CSS = """ body,.gradio-container{background:#0F1120!important;} .gr-button-primary{background:#6D28D9!important;border-color:#6D28D9!important;color:#fff!important;} .gr-button-primary:hover{background:#5B21B6!important;} .gr-button-secondary{background:#1E293B!important;border-color:#334155!important;color:#94A3B8!important;} .gr-button-secondary:hover{background:#334155!important;color:#E2E8F0!important;} .gr-textbox textarea{background:#0F1120!important;color:#E2E8F0!important; border-color:#2D3555!important;font-family:monospace!important;font-size:12px!important;} label{color:#94A3B8!important;} .gr-markdown{color:#CBD5E1!important;} .gr-markdown h3{color:#A78BFA!important;} .gr-markdown code{background:#1E293B!important;color:#C084FC!important;} footer{display:none!important;} """ def build_ui() -> gr.Blocks: with gr.Blocks( title="Concept Atlas — Embeddings", css=_CSS, theme=gr.themes.Base( primary_hue="violet", secondary_hue="emerald", neutral_hue="slate", font=[gr.themes.GoogleFont("Inter"), "system-ui"], ), ) as demo: gr.HTML(_HDR) with gr.Tabs(): # ── 0: Run ─────────────────────────────────────────────────────── with gr.TabItem("🚀 Run Pipeline"): gr.Markdown(_INSTRUCTIONS) with gr.Row(): token_box = gr.Textbox( label="HuggingFace token (write access)", placeholder="hf_… (or leave blank if HF_TOKEN secret is set)", type="password", scale=3, ) run_btn = gr.Button("▶ Run Pipeline", variant="primary", scale=1) live_log = gr.Textbox( label="Live pipeline log", value=get_live_log, interactive=False, lines=28, every=3, ) run_btn.click(fn=launch_pipeline, inputs=token_box, outputs=None) # ── 1: Status ──────────────────────────────────────────────────── with gr.TabItem("📋 Status"): status_btn = gr.Button("Refresh status", variant="secondary") status_md = gr.Markdown(value=get_status_md) # also auto-refresh every 5 s status_auto = gr.Markdown(value=get_status_md, every=5, visible=False) status_btn.click(fn=get_status_md, outputs=status_md) gr.Markdown("---") cache_btn = gr.Button("Refresh cache inventory", variant="secondary") cache_md = gr.Markdown() cache_btn.click(fn=get_cache_inventory, outputs=cache_md) # ── 2: Download ─────────────────────────────────────────────────── with gr.TabItem("⬇ Download"): gr.Markdown( "### Download artefacts\n" "Files are generated by the pipeline and cached locally. " "They are also available at " f"[{HF_DATASET_REPO}]" f"(https://huggingface.co/datasets/{HF_DATASET_REPO})." ) # ── per-concept ─────────────────────────────────────────────── for concept, emoji, colour in [ ("mensch", "🔵", "#C084FC"), ("verhalten", "🟢", "#34D399"), ("evolution", "🟠", "#FB923C"), ]: gr.Markdown( f"#### {emoji} `{concept}` " f"embeddings") with gr.Row(): be = gr.Button(f"Download {concept} embeddings (.npy)", variant="primary") bm = gr.Button(f"Download {concept} metadata (.parquet)", variant="secondary") with gr.Row(): fe = gr.File(label=f"{concept}_embeddings.npy") fm = gr.File(label=f"{concept}_metadata.parquet") be.click(fn=lambda c=concept: download_concept_embeddings(c), outputs=fe) bm.click(fn=lambda c=concept: download_concept_metadata(c), outputs=fm) gr.Markdown("---") gr.Markdown("#### 📦 Download everything") with gr.Row(): zip_btn = gr.Button("Download all artefacts as ZIP", variant="primary") zip_file = gr.File(label="curriculum_embeddings_*.zip") zip_btn.click(fn=download_all_zip, outputs=zip_file) gr.Markdown( "---\n" "### Dataset usage\n" "```python\n" "import numpy as np\n" "import pandas as pd\n" "from huggingface_hub import hf_hub_download\n\n" f'concept = "evolution"\n\n' "emb_path = hf_hub_download(\n" f' repo_id="{HF_DATASET_REPO}",\n' ' repo_type="dataset",\n' ' filename=f"{concept}/embeddings.npy")\n\n' "meta_path = hf_hub_download(\n" f' repo_id="{HF_DATASET_REPO}",\n' ' repo_type="dataset",\n' ' filename=f"{concept}/metadata.parquet")\n\n' "embs = np.load(emb_path) # (N, 768) float32\n" "meta = pd.read_parquet(meta_path) # N rows\n" "```" ) gr.HTML( "
" "Concept Atlas · OpenEvo/CCS · " f"Dataset" "
" ) return demo # ============================================================================= # STARTUP — try to restore a prior run's state # ============================================================================= def _startup() -> None: logger.info("=" * 56) logger.info("Concept Atlas — Embedding Generator — startup") logger.info("=" * 56) try: ok = _load_state() if ok: logger.info(f" Prior state restored: {_P.get('concepts_done')}") else: logger.info(" No prior cache — fresh start.") except Exception: logger.warning(f" Startup restore error:\n{traceback.format_exc()}") _startup() # ============================================================================= if __name__ == "__main__": demo = build_ui() demo.launch( server_name="0.0.0.0", server_port=7860, share=False, show_error=True, )