Spaces:
Sleeping
Sleeping
| """ | |
| Concept Atlas — Embedding Generator | |
| ===================================== | |
| Generates sentence embeddings for each focus concept (mensch, verhalten, evolution) | |
| and pushes embeddings + metadata to a HuggingFace public dataset. | |
| Space: https://huggingface.co/spaces/deirdosh/curriculum_analysis_german | |
| Dataset: https://huggingface.co/datasets/deirdosh/curriculum_embeddings | |
| """ | |
| # ── stdlib ─────────────────────────────────────────────────────────────────── | |
| import os, sys, json, hashlib, warnings, logging, traceback, time, threading | |
| import shutil | |
| import urllib.request | |
| from pathlib import Path | |
| from datetime import datetime | |
| warnings.filterwarnings("ignore") | |
| LOG_FILE = Path("pipeline.log") | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format="%(asctime)s [%(levelname)s] %(message)s", | |
| handlers=[ | |
| logging.StreamHandler(sys.stdout), | |
| logging.FileHandler(str(LOG_FILE), mode="a", encoding="utf-8"), | |
| ], | |
| ) | |
| logger = logging.getLogger(__name__) | |
| # ── third-party ────────────────────────────────────────────────────────────── | |
| import numpy as np | |
| import pandas as pd | |
| import gradio as gr | |
| # ── lazy heavy imports ──────────────────────────────────────────────────────── | |
| _ST_MODEL = None | |
| # ── directories ────────────────────────────────────────────────────────────── | |
| CACHE_DIR = Path("cache") | |
| DATA_DIR = Path("data") | |
| for _d in (CACHE_DIR, DATA_DIR): | |
| _d.mkdir(parents=True, exist_ok=True) | |
| # ── constants ───────────────────────────────────────────────────────────────── | |
| FOCUS_CONCEPTS = ["mensch", "verhalten", "evolution"] | |
| MODEL_NAME = "paraphrase-multilingual-mpnet-base-v2" | |
| # Source corpus CSV (this Space's own file) | |
| CSV_URL = ( | |
| "https://huggingface.co/spaces/deirdosh/curriculum_analysis_german" | |
| "/resolve/main/data/curriculum_excerpts.csv" | |
| ) | |
| # Target HuggingFace dataset repository | |
| HF_DATASET_REPO = "deirdosh/curriculum_embeddings" | |
| # ── pipeline state ───────────────────────────────────────────────────────── | |
| _P: dict = {} | |
| _PIPELINE_RUNNING = threading.Event() | |
| _PIPELINE_THREAD: threading.Thread | None = None | |
| # Live log ring-buffer for the UI ticker | |
| _LOG_LINES: list[str] = [] | |
| _LOG_LOCK = threading.Lock() | |
| _MAX_LOG = 400 | |
| class _UILogHandler(logging.Handler): | |
| def emit(self, record: logging.LogRecord) -> None: | |
| with _LOG_LOCK: | |
| _LOG_LINES.append(self.format(record)) | |
| if len(_LOG_LINES) > _MAX_LOG: | |
| _LOG_LINES.pop(0) | |
| _ui_handler = _UILogHandler() | |
| _ui_handler.setFormatter( | |
| logging.Formatter("%(asctime)s %(message)s", datefmt="%H:%M:%S")) | |
| logging.getLogger(__name__).addHandler(_ui_handler) | |
| def get_live_log() -> str: | |
| with _LOG_LOCK: | |
| lines = list(_LOG_LINES) | |
| return "\n".join(lines[-100:]) | |
| # ============================================================================= | |
| # ATOMIC CACHE HELPERS | |
| # ============================================================================= | |
| def _ckey(logical: str) -> str: | |
| return hashlib.md5(logical.encode()).hexdigest()[:10] | |
| def _npy_path(key: str) -> Path: | |
| safe = "".join(c if c.isalnum() or c in "-_" else "_" for c in key)[:60] | |
| return CACHE_DIR / f"{safe}__{_ckey(key)}.npy" | |
| def _json_path(key: str) -> Path: | |
| safe = "".join(c if c.isalnum() or c in "-_" else "_" for c in key)[:60] | |
| return CACHE_DIR / f"{safe}__{_ckey(key)}.json" | |
| def _parquet_path(key: str) -> Path: | |
| safe = "".join(c if c.isalnum() or c in "-_" else "_" for c in key)[:60] | |
| return CACHE_DIR / f"{safe}__{_ckey(key)}.parquet" | |
| def _save_npy(arr: np.ndarray, key: str) -> Path: | |
| dest = _npy_path(key) | |
| tmp = dest.with_name(dest.stem + "__tmp.npy") | |
| try: | |
| np.save(tmp, arr) # numpy writes exactly tmp (ends in .npy) | |
| shutil.move(str(tmp), str(dest)) | |
| logger.info(f" [cache ✓] {dest.name} shape={arr.shape} dtype={arr.dtype}") | |
| return dest | |
| except Exception as exc: | |
| tmp.unlink(missing_ok=True) | |
| raise RuntimeError(f"_save_npy failed '{key}': {exc}") from exc | |
| def _load_npy(key: str, expected_rows: int | None = None) -> np.ndarray | None: | |
| p = _npy_path(key) | |
| if not p.exists(): | |
| return None | |
| try: | |
| arr = np.load(p, allow_pickle=False) | |
| if arr.size == 0: | |
| raise ValueError("empty array") | |
| if expected_rows is not None and arr.shape[0] != expected_rows: | |
| raise ValueError(f"rows {arr.shape[0]} ≠ {expected_rows}") | |
| logger.info(f" [cache ↑] {p.name} shape={arr.shape}") | |
| return arr | |
| except Exception as exc: | |
| logger.warning(f" [cache ✗] {p.name}: {exc} — deleting") | |
| p.unlink(missing_ok=True) | |
| return None | |
| def _save_json(obj, key: str) -> Path: | |
| p = _json_path(key) | |
| tmp = p.with_name(p.stem + "__tmp.json") | |
| try: | |
| tmp.write_text( | |
| json.dumps(obj, ensure_ascii=False, indent=2), encoding="utf-8") | |
| shutil.move(str(tmp), str(p)) | |
| logger.info(f" [cache ✓] {p.name}") | |
| return p | |
| except Exception as exc: | |
| tmp.unlink(missing_ok=True) | |
| raise RuntimeError(f"_save_json failed '{key}': {exc}") from exc | |
| def _load_json(key: str): | |
| p = _json_path(key) | |
| if not p.exists(): | |
| return None | |
| try: | |
| obj = json.loads(p.read_text(encoding="utf-8")) | |
| logger.info(f" [cache ↑] {p.name}") | |
| return obj | |
| except Exception as exc: | |
| logger.warning(f" [cache ✗] {p.name}: {exc} — deleting") | |
| p.unlink(missing_ok=True) | |
| return None | |
| def _save_parquet(df: pd.DataFrame, key: str) -> Path: | |
| dest = _parquet_path(key) | |
| tmp = dest.with_name(dest.stem + "__tmp.parquet") | |
| try: | |
| df.to_parquet(tmp, index=False) | |
| shutil.move(str(tmp), str(dest)) | |
| logger.info(f" [cache ✓] {dest.name} rows={len(df)}") | |
| return dest | |
| except Exception as exc: | |
| tmp.unlink(missing_ok=True) | |
| raise RuntimeError(f"_save_parquet failed '{key}': {exc}") from exc | |
| def _text_fingerprint(texts: list[str]) -> str: | |
| s = ((texts[0] if texts else "") | |
| + (texts[-1] if len(texts) > 1 else "") | |
| + str(len(texts))) | |
| return hashlib.md5(s.encode()).hexdigest()[:8] | |
| # ============================================================================= | |
| # DATA LOADING | |
| # ============================================================================= | |
| def _load_csv() -> pd.DataFrame: | |
| local = DATA_DIR / "curriculum_excerpts.csv" | |
| if not local.exists(): | |
| logger.info("Downloading corpus CSV from HuggingFace …") | |
| for attempt in range(4): | |
| try: | |
| urllib.request.urlretrieve(CSV_URL, local) | |
| logger.info(f" Downloaded → {local}") | |
| break | |
| except Exception as exc: | |
| logger.warning(f" Attempt {attempt+1} failed: {exc}") | |
| time.sleep(3 * (attempt + 1)) | |
| else: | |
| raise RuntimeError("Could not download CSV after 4 attempts.") | |
| df = pd.read_csv(local, dtype=str).fillna("") | |
| df.columns = [c.strip().lower().replace(" ", "_") for c in df.columns] | |
| for req in ("search_term", "text_excerpt"): | |
| if req not in df.columns: | |
| raise ValueError( | |
| f"CSV missing required column '{req}'. " | |
| f"Found: {list(df.columns)}") | |
| for opt in ("state", "subject", "grade", "school_type", "year", "file"): | |
| if opt not in df.columns: | |
| df[opt] = "" | |
| df["search_term_lower"] = df["search_term"].str.lower().str.strip() | |
| df["text_excerpt"] = df["text_excerpt"].str.strip() | |
| df = df[df["text_excerpt"].str.len() > 20].reset_index(drop=True) | |
| logger.info(f"CSV loaded: {len(df):,} rows | columns: {list(df.columns)}") | |
| return df | |
| def _filter_concept(df: pd.DataFrame, concept: str) -> pd.DataFrame: | |
| sub = df[df["search_term_lower"] == concept].reset_index(drop=True) | |
| if len(sub) < 5: | |
| sub = df[df["search_term_lower"].str.contains( | |
| concept, na=False)].reset_index(drop=True) | |
| logger.info(f" [{concept}] {len(sub):,} rows after filtering") | |
| return sub | |
| # ============================================================================= | |
| # SENTENCE-TRANSFORMER | |
| # ============================================================================= | |
| def _get_model(): | |
| global _ST_MODEL | |
| if _ST_MODEL is None: | |
| logger.info(f"Loading SentenceTransformer '{MODEL_NAME}' …") | |
| from sentence_transformers import SentenceTransformer | |
| _ST_MODEL = SentenceTransformer(MODEL_NAME) | |
| logger.info(" Model ready.") | |
| return _ST_MODEL | |
| def compute_embeddings(texts: list[str], concept: str) -> np.ndarray: | |
| """ | |
| Encode texts with L2-normalised embeddings. | |
| Cached by concept + n_texts + text fingerprint so a content change | |
| automatically invalidates the cache. | |
| """ | |
| fp = _text_fingerprint(texts) | |
| key = f"emb_{concept}_{len(texts)}_{fp}" | |
| hit = _load_npy(key, expected_rows=len(texts)) | |
| if hit is not None: | |
| logger.info(f" [{concept}] embeddings loaded from cache") | |
| return hit | |
| logger.info(f" [{concept}] encoding {len(texts):,} texts …") | |
| model = _get_model() | |
| arr = model.encode( | |
| texts, | |
| batch_size=32, | |
| show_progress_bar=True, | |
| convert_to_numpy=True, | |
| normalize_embeddings=True, | |
| ).astype(np.float32) | |
| _save_npy(arr, key) | |
| return arr | |
| # ============================================================================= | |
| # BUILD PER-CONCEPT ARTEFACTS | |
| # ============================================================================= | |
| def _build_concept_artefacts( | |
| sub: pd.DataFrame, | |
| embs: np.ndarray, | |
| concept: str, | |
| ) -> dict[str, Path]: | |
| """ | |
| Produce three files per concept and return a dict of {role: path}: | |
| embeddings.npy — float32 array (N, 768) | |
| metadata.parquet — one row per excerpt with all CSV columns + row_id | |
| metadata.json — same but as JSON for easy inspection | |
| """ | |
| n = len(sub) | |
| fp = _text_fingerprint(sub["text_excerpt"].tolist()) | |
| # ── embeddings ──────────────────────────────────────────────────────────── | |
| emb_key = f"emb_{concept}_{n}_{fp}" | |
| emb_path = _npy_path(emb_key) # already saved by compute_embeddings | |
| # ── metadata parquet ────────────────────────────────────────────────────── | |
| meta_key = f"meta_{concept}_{n}_{fp}" | |
| meta_path = _parquet_path(meta_key) | |
| if not meta_path.exists(): | |
| meta_df = sub.copy() | |
| meta_df.insert(0, "row_id", range(n)) | |
| meta_df.insert(1, "concept", concept) | |
| meta_df["embedding_dim"] = embs.shape[1] | |
| meta_df["n_texts"] = n | |
| meta_df["model"] = MODEL_NAME | |
| meta_df["created_at"] = datetime.now().isoformat(timespec="seconds") | |
| _save_parquet(meta_df, meta_key) | |
| else: | |
| logger.info(f" [{concept}] metadata parquet already cached") | |
| # ── metadata JSON (first 5 rows + schema) ───────────────────────────────── | |
| json_key = f"meta_preview_{concept}_{n}_{fp}" | |
| json_path = _json_path(json_key) | |
| if not json_path.exists(): | |
| preview = { | |
| "concept": concept, | |
| "n_texts": n, | |
| "embedding_dim": int(embs.shape[1]), | |
| "model": MODEL_NAME, | |
| "created_at": datetime.now().isoformat(timespec="seconds"), | |
| "columns": list(sub.columns), | |
| "preview_rows": sub.head(5).to_dict(orient="records"), | |
| } | |
| _save_json(preview, json_key) | |
| else: | |
| logger.info(f" [{concept}] metadata JSON already cached") | |
| return { | |
| "embeddings": emb_path, | |
| "metadata_parquet": meta_path, | |
| "metadata_json": json_path, | |
| } | |
| # ============================================================================= | |
| # HUGGINGFACE DATASET PUSH | |
| # ============================================================================= | |
| def _push_concept_to_hf( | |
| concept: str, | |
| paths: dict[str, Path], | |
| token: str, | |
| ) -> str: | |
| """ | |
| Upload a single concept's artefacts to the HF dataset repo. | |
| Remote layout: | |
| {concept}/embeddings.npy | |
| {concept}/metadata.parquet | |
| {concept}/metadata_preview.json | |
| """ | |
| try: | |
| from huggingface_hub import HfApi | |
| api = HfApi(token=token) | |
| # Ensure the dataset repo exists (creates if needed) | |
| try: | |
| api.repo_info(repo_id=HF_DATASET_REPO, repo_type="dataset") | |
| except Exception: | |
| logger.info(f" Creating dataset repo '{HF_DATASET_REPO}' …") | |
| api.create_repo( | |
| repo_id=HF_DATASET_REPO, | |
| repo_type="dataset", | |
| private=False, | |
| exist_ok=True, | |
| ) | |
| uploads = [ | |
| (paths["embeddings"], f"{concept}/embeddings.npy"), | |
| (paths["metadata_parquet"], f"{concept}/metadata.parquet"), | |
| (paths["metadata_json"], f"{concept}/metadata_preview.json"), | |
| ] | |
| for local_path, remote_path in uploads: | |
| if not local_path.exists(): | |
| logger.warning(f" Skipping missing file: {local_path}") | |
| continue | |
| logger.info(f" Uploading {local_path.name} → {remote_path} …") | |
| api.upload_file( | |
| path_or_fileobj=str(local_path), | |
| path_in_repo=remote_path, | |
| repo_id=HF_DATASET_REPO, | |
| repo_type="dataset", | |
| commit_message=( | |
| f"[{concept}] update embeddings " | |
| f"{datetime.now().isoformat(timespec='minutes')}" | |
| ), | |
| ) | |
| logger.info(f" ✓ {remote_path}") | |
| return f"✓ [{concept}] pushed to {HF_DATASET_REPO}" | |
| except Exception as exc: | |
| msg = f"✗ [{concept}] HF push failed: {exc}\n{traceback.format_exc()}" | |
| logger.error(msg) | |
| return msg | |
| def _push_dataset_card(token: str, summary: dict) -> None: | |
| """Write/update a README.md dataset card on HF.""" | |
| try: | |
| from huggingface_hub import HfApi | |
| api = HfApi(token=token) | |
| lines = [ | |
| "---", | |
| "license: cc-by-4.0", | |
| "language:", | |
| "- de", | |
| "tags:", | |
| "- embeddings", | |
| "- curriculum", | |
| "- education", | |
| "- german", | |
| "- sentence-transformers", | |
| "---", | |
| "", | |
| "# German Curriculum Concept Embeddings", | |
| "", | |
| "Sentence embeddings for three focus concepts from the German school " | |
| "curriculum analysis project.", | |
| "", | |
| f"**Model:** `{MODEL_NAME}` ", | |
| f"**Generated:** {datetime.now().isoformat(timespec='seconds')} ", | |
| "", | |
| "## Structure", | |
| "", | |
| "```", | |
| "concept/", | |
| " embeddings.npy # float32 (N, 768) L2-normalised", | |
| " metadata.parquet # one row per excerpt, all CSV columns", | |
| " metadata_preview.json # schema + first 5 rows", | |
| "```", | |
| "", | |
| "## Concepts", | |
| "", | |
| ] | |
| for concept, info in summary.items(): | |
| lines.append( | |
| f"### `{concept}` " | |
| f"— {info['n']:,} excerpts · dim={info['dim']}" | |
| ) | |
| lines += [ | |
| "", | |
| "## Usage", | |
| "", | |
| "```python", | |
| "import numpy as np", | |
| "import pandas as pd", | |
| "from huggingface_hub import hf_hub_download", | |
| "", | |
| "concept = 'evolution'", | |
| "", | |
| "emb_path = hf_hub_download(", | |
| f' repo_id="{HF_DATASET_REPO}",', | |
| ' repo_type="dataset",', | |
| ' filename=f"{concept}/embeddings.npy")', | |
| "", | |
| "meta_path = hf_hub_download(", | |
| f' repo_id="{HF_DATASET_REPO}",', | |
| ' repo_type="dataset",', | |
| ' filename=f"{concept}/metadata.parquet")', | |
| "", | |
| "embs = np.load(emb_path) # (N, 768)", | |
| "meta = pd.read_parquet(meta_path) # N rows", | |
| "```", | |
| "", | |
| "## Source", | |
| "", | |
| "Generated by the " | |
| "[Concept Atlas Space]" | |
| "(https://huggingface.co/spaces/deirdosh/curriculum_analysis_german).", | |
| ] | |
| readme = "\n".join(lines) | |
| api.upload_file( | |
| path_or_fileobj=readme.encode("utf-8"), | |
| path_in_repo="README.md", | |
| repo_id=HF_DATASET_REPO, | |
| repo_type="dataset", | |
| commit_message="update dataset card", | |
| ) | |
| logger.info(" Dataset card (README.md) updated.") | |
| except Exception as exc: | |
| logger.warning(f" Dataset card update failed: {exc}") | |
| # ============================================================================= | |
| # PIPELINE STATE CACHE (lightweight JSON) | |
| # ============================================================================= | |
| _META_KEY = "embedding_pipeline_meta_v1" | |
| def _save_state() -> None: | |
| state = { | |
| "timestamp": _P.get("timestamp", ""), | |
| "concepts_done": _P.get("concepts_done", []), | |
| "concept_meta": _P.get("concept_meta", {}), | |
| "hf_status": _P.get("hf_status", {}), | |
| } | |
| _save_json(state, _META_KEY) | |
| def _load_state() -> bool: | |
| state = _load_json(_META_KEY) | |
| if not state: | |
| return False | |
| _P.update({ | |
| "timestamp": state.get("timestamp", ""), | |
| "concepts_done": state.get("concepts_done", []), | |
| "concept_meta": state.get("concept_meta", {}), | |
| "hf_status": state.get("hf_status", {}), | |
| }) | |
| logger.info(f" Prior run restored: {_P['concepts_done']} [{_P['timestamp']}]") | |
| return True | |
| # ============================================================================= | |
| # PIPELINE WORKER | |
| # ============================================================================= | |
| def _pipeline_worker(token: str) -> None: | |
| try: | |
| # ── Load corpus ─────────────────────────────────────────────────────── | |
| logger.info("━" * 56) | |
| logger.info("STEP 1/4 Loading corpus CSV") | |
| logger.info("━" * 56) | |
| df = _load_csv() | |
| _P["df"] = df | |
| concepts_done = list(_P.get("concepts_done", [])) | |
| concept_meta = dict(_P.get("concept_meta", {})) | |
| hf_status = dict(_P.get("hf_status", {})) | |
| # ── Per-concept embedding ───────────────────────────────────────────── | |
| logger.info("━" * 56) | |
| logger.info("STEP 2/4 Computing embeddings per concept") | |
| logger.info("━" * 56) | |
| for concept in FOCUS_CONCEPTS: | |
| logger.info(f"\n[{concept.upper()}] ── filtering …") | |
| sub = _filter_concept(df, concept) | |
| n = len(sub) | |
| if n < 5: | |
| logger.warning(f" [{concept}] only {n} rows — skipping") | |
| continue | |
| texts = sub["text_excerpt"].tolist() | |
| logger.info(f"[{concept.upper()}] ── embeddings ({n:,} texts) …") | |
| embs = compute_embeddings(texts, concept) | |
| logger.info(f"[{concept.upper()}] ── building artefact files …") | |
| paths = _build_concept_artefacts(sub, embs, concept) | |
| # store in-memory for status display | |
| concept_meta[concept] = { | |
| "n": n, | |
| "dim": int(embs.shape[1]), | |
| "fp": _text_fingerprint(texts), | |
| "emb_path": str(paths["embeddings"]), | |
| "meta_path": str(paths["metadata_parquet"]), | |
| } | |
| if concept not in concepts_done: | |
| concepts_done.append(concept) | |
| _P.update(dict( | |
| concepts_done=concepts_done, | |
| concept_meta=concept_meta, | |
| )) | |
| _save_state() # checkpoint after each concept | |
| # ── Push to HF ──────────────────────────────────────────────────────── | |
| logger.info("━" * 56) | |
| logger.info("STEP 3/4 Pushing to HuggingFace dataset") | |
| logger.info("━" * 56) | |
| if not token: | |
| logger.warning( | |
| " HF_TOKEN not provided — skipping upload.\n" | |
| " Set HF_TOKEN as a Space secret or pass it in the UI.") | |
| else: | |
| for concept in concepts_done: | |
| logger.info(f"\n[{concept.upper()}] ── uploading …") | |
| meta = concept_meta[concept] | |
| paths_for_push = { | |
| "embeddings": Path(meta["emb_path"]), | |
| "metadata_parquet": Path(meta["meta_path"]), | |
| "metadata_json": _json_path( | |
| f"meta_preview_{concept}_{meta['n']}_{meta['fp']}"), | |
| } | |
| result = _push_concept_to_hf(concept, paths_for_push, token) | |
| hf_status[concept] = result | |
| logger.info(f" {result}") | |
| # dataset card | |
| logger.info("\nUpdating dataset README …") | |
| _push_dataset_card(token, concept_meta) | |
| _P.update(dict(hf_status=hf_status)) | |
| # ── Finalise ────────────────────────────────────────────────────────── | |
| logger.info("━" * 56) | |
| logger.info("STEP 4/4 Saving final state") | |
| logger.info("━" * 56) | |
| _P["timestamp"] = datetime.now().isoformat(timespec="seconds") | |
| _save_state() | |
| logger.info("\n" + "═" * 56) | |
| logger.info("✓ PIPELINE COMPLETE") | |
| logger.info(f" Timestamp : {_P['timestamp']}") | |
| for concept in concepts_done: | |
| m = concept_meta[concept] | |
| logger.info( | |
| f" {concept:12s}: {m['n']:,} texts · " | |
| f"dim={m['dim']} · " | |
| f"HF={hf_status.get(concept, 'not pushed')[:40]}") | |
| logger.info("═" * 56) | |
| except Exception: | |
| logger.error(f"Pipeline error:\n{traceback.format_exc()}") | |
| finally: | |
| _PIPELINE_RUNNING.clear() | |
| # ============================================================================= | |
| # PUBLIC API (called by Gradio buttons) | |
| # ============================================================================= | |
| def launch_pipeline(hf_token: str) -> None: | |
| """Start the embedding pipeline in a background thread.""" | |
| global _PIPELINE_THREAD | |
| if _PIPELINE_RUNNING.is_set(): | |
| logger.info("Pipeline already running — wait for it to finish.") | |
| return | |
| _PIPELINE_RUNNING.set() | |
| logger.info("⏳ Pipeline launched …") | |
| token = hf_token.strip() or os.environ.get("HF_TOKEN", "") | |
| _PIPELINE_THREAD = threading.Thread( | |
| target=_pipeline_worker, | |
| args=(token,), | |
| name="pipeline", | |
| daemon=True, | |
| ) | |
| _PIPELINE_THREAD.start() | |
| def get_status_md() -> str: | |
| """Markdown summary card shown in the UI.""" | |
| if not _P.get("concepts_done"): | |
| return "_No pipeline run yet — click **▶ Run** to start._" | |
| lines = [ | |
| f"**Last run:** {_P.get('timestamp','—')}", | |
| "", | |
| "| Concept | N texts | Dim | Local cache | HF status |", | |
| "|---|---|---|---|---|", | |
| ] | |
| for concept in FOCUS_CONCEPTS: | |
| meta = _P.get("concept_meta", {}).get(concept, {}) | |
| n = f"{meta.get('n', 0):,}" if meta else "—" | |
| dim = str(meta.get("dim", "—")) | |
| cached = "✓" if meta.get("emb_path") and Path(meta["emb_path"]).exists() else "—" | |
| hfs = _P.get("hf_status", {}).get(concept, "—")[:50] | |
| lines.append(f"| {concept.capitalize()} | {n} | {dim} | {cached} | {hfs} |") | |
| # dataset link | |
| lines += [ | |
| "", | |
| f"**Dataset:** [{HF_DATASET_REPO}]" | |
| f"(https://huggingface.co/datasets/{HF_DATASET_REPO})", | |
| ] | |
| return "\n".join(lines) | |
| def get_cache_inventory() -> str: | |
| """List all cached files.""" | |
| files = sorted(CACHE_DIR.glob("*")) + sorted(DATA_DIR.glob("*")) | |
| if not files: | |
| return "_Cache is empty._" | |
| total = sum(f.stat().st_size for f in files if f.is_file()) | |
| lines = [f"### {len(files)} cached files ({total/1_048_576:.2f} MB total)", ""] | |
| for f in files: | |
| if f.is_file(): | |
| sz = f.stat().st_size / 1024 | |
| ts = datetime.fromtimestamp(f.stat().st_mtime).strftime("%m-%d %H:%M") | |
| lines.append(f"- `{f.name}` — {sz:.1f} KB · {ts}") | |
| return "\n".join(lines) | |
| def download_concept_embeddings(concept: str) -> str | None: | |
| """Return path to the concept's .npy embedding file for gr.File.""" | |
| meta = _P.get("concept_meta", {}).get(concept, {}) | |
| p = Path(meta.get("emb_path", "")) if meta else Path("") | |
| if p.exists(): | |
| return str(p) | |
| # try to find by glob | |
| candidates = sorted(CACHE_DIR.glob(f"emb_{concept}_*__*.npy"), | |
| key=lambda x: x.stat().st_mtime, reverse=True) | |
| return str(candidates[0]) if candidates else None | |
| def download_concept_metadata(concept: str) -> str | None: | |
| """Return path to the concept's .parquet metadata file for gr.File.""" | |
| meta = _P.get("concept_meta", {}).get(concept, {}) | |
| p = Path(meta.get("meta_path", "")) if meta else Path("") | |
| if p.exists(): | |
| return str(p) | |
| candidates = sorted(CACHE_DIR.glob(f"meta_{concept}_*__*.parquet"), | |
| key=lambda x: x.stat().st_mtime, reverse=True) | |
| return str(candidates[0]) if candidates else None | |
| def download_all_zip() -> str | None: | |
| """Bundle all concept artefacts into a single ZIP.""" | |
| import zipfile | |
| files: list[Path] = [] | |
| for concept in FOCUS_CONCEPTS: | |
| emb = download_concept_embeddings(concept) | |
| mta = download_concept_metadata(concept) | |
| if emb: files.append(Path(emb)) | |
| if mta: files.append(Path(mta)) | |
| # JSON preview | |
| meta = _P.get("concept_meta", {}).get(concept, {}) | |
| if meta: | |
| jp = _json_path( | |
| f"meta_preview_{concept}_{meta.get('n',0)}_{meta.get('fp','')}") | |
| if jp.exists(): | |
| files.append(jp) | |
| if LOG_FILE.exists(): | |
| files.append(LOG_FILE) | |
| if not files: | |
| return None | |
| out = CACHE_DIR / f"curriculum_embeddings_{datetime.now().strftime('%Y%m%d_%H%M%S')}.zip" | |
| with zipfile.ZipFile(out, "w", zipfile.ZIP_DEFLATED) as zf: | |
| for p in files: | |
| zf.write(p, p.name) | |
| logger.info(f" ZIP → {out.name} {out.stat().st_size//1024} KB") | |
| return str(out) | |
| # ============================================================================= | |
| # GRADIO UI | |
| # ============================================================================= | |
| _HDR = """ | |
| <div style=" | |
| background: linear-gradient(135deg,#0F0B2D 0%,#1E1A56 40%,#0A2818 80%,#2D1200 100%); | |
| padding:28px 34px 22px; border-radius:14px; margin-bottom:12px; | |
| box-shadow:0 8px 32px rgba(0,0,0,.55), inset 0 1px 0 rgba(255,255,255,.07); | |
| "> | |
| <h1 style="color:#fff;margin:0 0 7px;font-size:26px;font-weight:700;"> | |
| 🔭 Concept Atlas — Embedding Generator | |
| </h1> | |
| <p style="color:rgba(255,255,255,.68);margin:0;font-size:13px;line-height:1.7;"> | |
| Generates <b>sentence embeddings</b> for | |
| <b style="color:#C084FC">mensch</b> · | |
| <b style="color:#34D399">verhalten</b> · | |
| <b style="color:#FB923C">evolution</b> | |
| and pushes them to a public HuggingFace dataset. | |
| </p> | |
| <p style="color:rgba(255,255,255,.35);margin:6px 0 0;font-size:11px;"> | |
| Model: paraphrase-multilingual-mpnet-base-v2 | | |
| Dataset: | |
| <a href="https://huggingface.co/datasets/deirdosh/curriculum_embeddings" | |
| style="color:#C084FC;text-decoration:none;"> | |
| deirdosh/curriculum_embeddings | |
| </a> | |
| </p> | |
| </div> | |
| """ | |
| _INSTRUCTIONS = """ | |
| ### What this app does | |
| 1. Downloads the German curriculum corpus CSV (~35k excerpts) | |
| 2. Filters excerpts for each of the three focus concepts | |
| 3. Encodes every excerpt with `paraphrase-multilingual-mpnet-base-v2` | |
| (768-dim, L2-normalised, float32) | |
| 4. Saves per-concept artefacts: | |
| - `embeddings.npy` — shape `(N, 768)` | |
| - `metadata.parquet` — all CSV columns + `row_id`, `concept`, `model`, timestamps | |
| - `metadata_preview.json` — schema + first 5 rows | |
| 5. Pushes all artefacts to | |
| `huggingface.co/datasets/deirdosh/curriculum_embeddings` | |
| ### How to use | |
| | Step | Action | | |
| |---|---| | |
| | 1 | Paste your `HF_TOKEN` (needs *write* access to the dataset repo) | | |
| | 2 | Click **▶ Run Pipeline** | | |
| | 3 | Watch the live log — each concept takes ~10 min on CPU | | |
| | 4 | Download individual files or the full ZIP below | | |
| > All steps are individually cached. Re-running skips already-computed embeddings. | |
| > The HF_TOKEN can also be set as a **Space secret** — leave the field blank if so. | |
| """ | |
| _CSS = """ | |
| body,.gradio-container{background:#0F1120!important;} | |
| .gr-button-primary{background:#6D28D9!important;border-color:#6D28D9!important;color:#fff!important;} | |
| .gr-button-primary:hover{background:#5B21B6!important;} | |
| .gr-button-secondary{background:#1E293B!important;border-color:#334155!important;color:#94A3B8!important;} | |
| .gr-button-secondary:hover{background:#334155!important;color:#E2E8F0!important;} | |
| .gr-textbox textarea{background:#0F1120!important;color:#E2E8F0!important; | |
| border-color:#2D3555!important;font-family:monospace!important;font-size:12px!important;} | |
| label{color:#94A3B8!important;} | |
| .gr-markdown{color:#CBD5E1!important;} | |
| .gr-markdown h3{color:#A78BFA!important;} | |
| .gr-markdown code{background:#1E293B!important;color:#C084FC!important;} | |
| footer{display:none!important;} | |
| """ | |
| def build_ui() -> gr.Blocks: | |
| with gr.Blocks( | |
| title="Concept Atlas — Embeddings", | |
| css=_CSS, | |
| theme=gr.themes.Base( | |
| primary_hue="violet", secondary_hue="emerald", neutral_hue="slate", | |
| font=[gr.themes.GoogleFont("Inter"), "system-ui"], | |
| ), | |
| ) as demo: | |
| gr.HTML(_HDR) | |
| with gr.Tabs(): | |
| # ── 0: Run ─────────────────────────────────────────────────────── | |
| with gr.TabItem("🚀 Run Pipeline"): | |
| gr.Markdown(_INSTRUCTIONS) | |
| with gr.Row(): | |
| token_box = gr.Textbox( | |
| label="HuggingFace token (write access)", | |
| placeholder="hf_… (or leave blank if HF_TOKEN secret is set)", | |
| type="password", | |
| scale=3, | |
| ) | |
| run_btn = gr.Button("▶ Run Pipeline", variant="primary", scale=1) | |
| live_log = gr.Textbox( | |
| label="Live pipeline log", | |
| value=get_live_log, | |
| interactive=False, | |
| lines=28, | |
| every=3, | |
| ) | |
| run_btn.click(fn=launch_pipeline, inputs=token_box, outputs=None) | |
| # ── 1: Status ──────────────────────────────────────────────────── | |
| with gr.TabItem("📋 Status"): | |
| status_btn = gr.Button("Refresh status", variant="secondary") | |
| status_md = gr.Markdown(value=get_status_md) | |
| # also auto-refresh every 5 s | |
| status_auto = gr.Markdown(value=get_status_md, every=5, visible=False) | |
| status_btn.click(fn=get_status_md, outputs=status_md) | |
| gr.Markdown("---") | |
| cache_btn = gr.Button("Refresh cache inventory", variant="secondary") | |
| cache_md = gr.Markdown() | |
| cache_btn.click(fn=get_cache_inventory, outputs=cache_md) | |
| # ── 2: Download ─────────────────────────────────────────────────── | |
| with gr.TabItem("⬇ Download"): | |
| gr.Markdown( | |
| "### Download artefacts\n" | |
| "Files are generated by the pipeline and cached locally. " | |
| "They are also available at " | |
| f"[{HF_DATASET_REPO}]" | |
| f"(https://huggingface.co/datasets/{HF_DATASET_REPO})." | |
| ) | |
| # ── per-concept ─────────────────────────────────────────────── | |
| for concept, emoji, colour in [ | |
| ("mensch", "🔵", "#C084FC"), | |
| ("verhalten", "🟢", "#34D399"), | |
| ("evolution", "🟠", "#FB923C"), | |
| ]: | |
| gr.Markdown( | |
| f"#### {emoji} `{concept}` " | |
| f"<span style='color:{colour}'>embeddings</span>") | |
| with gr.Row(): | |
| be = gr.Button(f"Download {concept} embeddings (.npy)", | |
| variant="primary") | |
| bm = gr.Button(f"Download {concept} metadata (.parquet)", | |
| variant="secondary") | |
| with gr.Row(): | |
| fe = gr.File(label=f"{concept}_embeddings.npy") | |
| fm = gr.File(label=f"{concept}_metadata.parquet") | |
| be.click(fn=lambda c=concept: download_concept_embeddings(c), | |
| outputs=fe) | |
| bm.click(fn=lambda c=concept: download_concept_metadata(c), | |
| outputs=fm) | |
| gr.Markdown("---") | |
| gr.Markdown("#### 📦 Download everything") | |
| with gr.Row(): | |
| zip_btn = gr.Button("Download all artefacts as ZIP", | |
| variant="primary") | |
| zip_file = gr.File(label="curriculum_embeddings_*.zip") | |
| zip_btn.click(fn=download_all_zip, outputs=zip_file) | |
| gr.Markdown( | |
| "---\n" | |
| "### Dataset usage\n" | |
| "```python\n" | |
| "import numpy as np\n" | |
| "import pandas as pd\n" | |
| "from huggingface_hub import hf_hub_download\n\n" | |
| f'concept = "evolution"\n\n' | |
| "emb_path = hf_hub_download(\n" | |
| f' repo_id="{HF_DATASET_REPO}",\n' | |
| ' repo_type="dataset",\n' | |
| ' filename=f"{concept}/embeddings.npy")\n\n' | |
| "meta_path = hf_hub_download(\n" | |
| f' repo_id="{HF_DATASET_REPO}",\n' | |
| ' repo_type="dataset",\n' | |
| ' filename=f"{concept}/metadata.parquet")\n\n' | |
| "embs = np.load(emb_path) # (N, 768) float32\n" | |
| "meta = pd.read_parquet(meta_path) # N rows\n" | |
| "```" | |
| ) | |
| gr.HTML( | |
| "<div style='text-align:center;padding:12px;font-size:11px;color:#475569;" | |
| "border-top:1px solid #1E293B;margin-top:8px;'>" | |
| "Concept Atlas · OpenEvo/CCS · " | |
| f"<a href='https://huggingface.co/datasets/{HF_DATASET_REPO}'" | |
| " style='color:#7C3AED;'>Dataset</a>" | |
| "</div>" | |
| ) | |
| return demo | |
| # ============================================================================= | |
| # STARTUP — try to restore a prior run's state | |
| # ============================================================================= | |
| def _startup() -> None: | |
| logger.info("=" * 56) | |
| logger.info("Concept Atlas — Embedding Generator — startup") | |
| logger.info("=" * 56) | |
| try: | |
| ok = _load_state() | |
| if ok: | |
| logger.info(f" Prior state restored: {_P.get('concepts_done')}") | |
| else: | |
| logger.info(" No prior cache — fresh start.") | |
| except Exception: | |
| logger.warning(f" Startup restore error:\n{traceback.format_exc()}") | |
| _startup() | |
| # ============================================================================= | |
| if __name__ == "__main__": | |
| demo = build_ui() | |
| demo.launch( | |
| server_name="0.0.0.0", | |
| server_port=7860, | |
| share=False, | |
| show_error=True, | |
| ) |