deirdosh's picture
Update app.py
c58d0a7 verified
"""
Concept Atlas — Embedding Generator
=====================================
Generates sentence embeddings for each focus concept (mensch, verhalten, evolution)
and pushes embeddings + metadata to a HuggingFace public dataset.
Space: https://huggingface.co/spaces/deirdosh/curriculum_analysis_german
Dataset: https://huggingface.co/datasets/deirdosh/curriculum_embeddings
"""
# ── stdlib ───────────────────────────────────────────────────────────────────
import os, sys, json, hashlib, warnings, logging, traceback, time, threading
import shutil
import urllib.request
from pathlib import Path
from datetime import datetime
warnings.filterwarnings("ignore")
LOG_FILE = Path("pipeline.log")
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
handlers=[
logging.StreamHandler(sys.stdout),
logging.FileHandler(str(LOG_FILE), mode="a", encoding="utf-8"),
],
)
logger = logging.getLogger(__name__)
# ── third-party ──────────────────────────────────────────────────────────────
import numpy as np
import pandas as pd
import gradio as gr
# ── lazy heavy imports ────────────────────────────────────────────────────────
_ST_MODEL = None
# ── directories ──────────────────────────────────────────────────────────────
CACHE_DIR = Path("cache")
DATA_DIR = Path("data")
for _d in (CACHE_DIR, DATA_DIR):
_d.mkdir(parents=True, exist_ok=True)
# ── constants ─────────────────────────────────────────────────────────────────
FOCUS_CONCEPTS = ["mensch", "verhalten", "evolution"]
MODEL_NAME = "paraphrase-multilingual-mpnet-base-v2"
# Source corpus CSV (this Space's own file)
CSV_URL = (
"https://huggingface.co/spaces/deirdosh/curriculum_analysis_german"
"/resolve/main/data/curriculum_excerpts.csv"
)
# Target HuggingFace dataset repository
HF_DATASET_REPO = "deirdosh/curriculum_embeddings"
# ── pipeline state ─────────────────────────────────────────────────────────
_P: dict = {}
_PIPELINE_RUNNING = threading.Event()
_PIPELINE_THREAD: threading.Thread | None = None
# Live log ring-buffer for the UI ticker
_LOG_LINES: list[str] = []
_LOG_LOCK = threading.Lock()
_MAX_LOG = 400
class _UILogHandler(logging.Handler):
def emit(self, record: logging.LogRecord) -> None:
with _LOG_LOCK:
_LOG_LINES.append(self.format(record))
if len(_LOG_LINES) > _MAX_LOG:
_LOG_LINES.pop(0)
_ui_handler = _UILogHandler()
_ui_handler.setFormatter(
logging.Formatter("%(asctime)s %(message)s", datefmt="%H:%M:%S"))
logging.getLogger(__name__).addHandler(_ui_handler)
def get_live_log() -> str:
with _LOG_LOCK:
lines = list(_LOG_LINES)
return "\n".join(lines[-100:])
# =============================================================================
# ATOMIC CACHE HELPERS
# =============================================================================
def _ckey(logical: str) -> str:
return hashlib.md5(logical.encode()).hexdigest()[:10]
def _npy_path(key: str) -> Path:
safe = "".join(c if c.isalnum() or c in "-_" else "_" for c in key)[:60]
return CACHE_DIR / f"{safe}__{_ckey(key)}.npy"
def _json_path(key: str) -> Path:
safe = "".join(c if c.isalnum() or c in "-_" else "_" for c in key)[:60]
return CACHE_DIR / f"{safe}__{_ckey(key)}.json"
def _parquet_path(key: str) -> Path:
safe = "".join(c if c.isalnum() or c in "-_" else "_" for c in key)[:60]
return CACHE_DIR / f"{safe}__{_ckey(key)}.parquet"
def _save_npy(arr: np.ndarray, key: str) -> Path:
dest = _npy_path(key)
tmp = dest.with_name(dest.stem + "__tmp.npy")
try:
np.save(tmp, arr) # numpy writes exactly tmp (ends in .npy)
shutil.move(str(tmp), str(dest))
logger.info(f" [cache ✓] {dest.name} shape={arr.shape} dtype={arr.dtype}")
return dest
except Exception as exc:
tmp.unlink(missing_ok=True)
raise RuntimeError(f"_save_npy failed '{key}': {exc}") from exc
def _load_npy(key: str, expected_rows: int | None = None) -> np.ndarray | None:
p = _npy_path(key)
if not p.exists():
return None
try:
arr = np.load(p, allow_pickle=False)
if arr.size == 0:
raise ValueError("empty array")
if expected_rows is not None and arr.shape[0] != expected_rows:
raise ValueError(f"rows {arr.shape[0]}{expected_rows}")
logger.info(f" [cache ↑] {p.name} shape={arr.shape}")
return arr
except Exception as exc:
logger.warning(f" [cache ✗] {p.name}: {exc} — deleting")
p.unlink(missing_ok=True)
return None
def _save_json(obj, key: str) -> Path:
p = _json_path(key)
tmp = p.with_name(p.stem + "__tmp.json")
try:
tmp.write_text(
json.dumps(obj, ensure_ascii=False, indent=2), encoding="utf-8")
shutil.move(str(tmp), str(p))
logger.info(f" [cache ✓] {p.name}")
return p
except Exception as exc:
tmp.unlink(missing_ok=True)
raise RuntimeError(f"_save_json failed '{key}': {exc}") from exc
def _load_json(key: str):
p = _json_path(key)
if not p.exists():
return None
try:
obj = json.loads(p.read_text(encoding="utf-8"))
logger.info(f" [cache ↑] {p.name}")
return obj
except Exception as exc:
logger.warning(f" [cache ✗] {p.name}: {exc} — deleting")
p.unlink(missing_ok=True)
return None
def _save_parquet(df: pd.DataFrame, key: str) -> Path:
dest = _parquet_path(key)
tmp = dest.with_name(dest.stem + "__tmp.parquet")
try:
df.to_parquet(tmp, index=False)
shutil.move(str(tmp), str(dest))
logger.info(f" [cache ✓] {dest.name} rows={len(df)}")
return dest
except Exception as exc:
tmp.unlink(missing_ok=True)
raise RuntimeError(f"_save_parquet failed '{key}': {exc}") from exc
def _text_fingerprint(texts: list[str]) -> str:
s = ((texts[0] if texts else "")
+ (texts[-1] if len(texts) > 1 else "")
+ str(len(texts)))
return hashlib.md5(s.encode()).hexdigest()[:8]
# =============================================================================
# DATA LOADING
# =============================================================================
def _load_csv() -> pd.DataFrame:
local = DATA_DIR / "curriculum_excerpts.csv"
if not local.exists():
logger.info("Downloading corpus CSV from HuggingFace …")
for attempt in range(4):
try:
urllib.request.urlretrieve(CSV_URL, local)
logger.info(f" Downloaded → {local}")
break
except Exception as exc:
logger.warning(f" Attempt {attempt+1} failed: {exc}")
time.sleep(3 * (attempt + 1))
else:
raise RuntimeError("Could not download CSV after 4 attempts.")
df = pd.read_csv(local, dtype=str).fillna("")
df.columns = [c.strip().lower().replace(" ", "_") for c in df.columns]
for req in ("search_term", "text_excerpt"):
if req not in df.columns:
raise ValueError(
f"CSV missing required column '{req}'. "
f"Found: {list(df.columns)}")
for opt in ("state", "subject", "grade", "school_type", "year", "file"):
if opt not in df.columns:
df[opt] = ""
df["search_term_lower"] = df["search_term"].str.lower().str.strip()
df["text_excerpt"] = df["text_excerpt"].str.strip()
df = df[df["text_excerpt"].str.len() > 20].reset_index(drop=True)
logger.info(f"CSV loaded: {len(df):,} rows | columns: {list(df.columns)}")
return df
def _filter_concept(df: pd.DataFrame, concept: str) -> pd.DataFrame:
sub = df[df["search_term_lower"] == concept].reset_index(drop=True)
if len(sub) < 5:
sub = df[df["search_term_lower"].str.contains(
concept, na=False)].reset_index(drop=True)
logger.info(f" [{concept}] {len(sub):,} rows after filtering")
return sub
# =============================================================================
# SENTENCE-TRANSFORMER
# =============================================================================
def _get_model():
global _ST_MODEL
if _ST_MODEL is None:
logger.info(f"Loading SentenceTransformer '{MODEL_NAME}' …")
from sentence_transformers import SentenceTransformer
_ST_MODEL = SentenceTransformer(MODEL_NAME)
logger.info(" Model ready.")
return _ST_MODEL
def compute_embeddings(texts: list[str], concept: str) -> np.ndarray:
"""
Encode texts with L2-normalised embeddings.
Cached by concept + n_texts + text fingerprint so a content change
automatically invalidates the cache.
"""
fp = _text_fingerprint(texts)
key = f"emb_{concept}_{len(texts)}_{fp}"
hit = _load_npy(key, expected_rows=len(texts))
if hit is not None:
logger.info(f" [{concept}] embeddings loaded from cache")
return hit
logger.info(f" [{concept}] encoding {len(texts):,} texts …")
model = _get_model()
arr = model.encode(
texts,
batch_size=32,
show_progress_bar=True,
convert_to_numpy=True,
normalize_embeddings=True,
).astype(np.float32)
_save_npy(arr, key)
return arr
# =============================================================================
# BUILD PER-CONCEPT ARTEFACTS
# =============================================================================
def _build_concept_artefacts(
sub: pd.DataFrame,
embs: np.ndarray,
concept: str,
) -> dict[str, Path]:
"""
Produce three files per concept and return a dict of {role: path}:
embeddings.npy — float32 array (N, 768)
metadata.parquet — one row per excerpt with all CSV columns + row_id
metadata.json — same but as JSON for easy inspection
"""
n = len(sub)
fp = _text_fingerprint(sub["text_excerpt"].tolist())
# ── embeddings ────────────────────────────────────────────────────────────
emb_key = f"emb_{concept}_{n}_{fp}"
emb_path = _npy_path(emb_key) # already saved by compute_embeddings
# ── metadata parquet ──────────────────────────────────────────────────────
meta_key = f"meta_{concept}_{n}_{fp}"
meta_path = _parquet_path(meta_key)
if not meta_path.exists():
meta_df = sub.copy()
meta_df.insert(0, "row_id", range(n))
meta_df.insert(1, "concept", concept)
meta_df["embedding_dim"] = embs.shape[1]
meta_df["n_texts"] = n
meta_df["model"] = MODEL_NAME
meta_df["created_at"] = datetime.now().isoformat(timespec="seconds")
_save_parquet(meta_df, meta_key)
else:
logger.info(f" [{concept}] metadata parquet already cached")
# ── metadata JSON (first 5 rows + schema) ─────────────────────────────────
json_key = f"meta_preview_{concept}_{n}_{fp}"
json_path = _json_path(json_key)
if not json_path.exists():
preview = {
"concept": concept,
"n_texts": n,
"embedding_dim": int(embs.shape[1]),
"model": MODEL_NAME,
"created_at": datetime.now().isoformat(timespec="seconds"),
"columns": list(sub.columns),
"preview_rows": sub.head(5).to_dict(orient="records"),
}
_save_json(preview, json_key)
else:
logger.info(f" [{concept}] metadata JSON already cached")
return {
"embeddings": emb_path,
"metadata_parquet": meta_path,
"metadata_json": json_path,
}
# =============================================================================
# HUGGINGFACE DATASET PUSH
# =============================================================================
def _push_concept_to_hf(
concept: str,
paths: dict[str, Path],
token: str,
) -> str:
"""
Upload a single concept's artefacts to the HF dataset repo.
Remote layout:
{concept}/embeddings.npy
{concept}/metadata.parquet
{concept}/metadata_preview.json
"""
try:
from huggingface_hub import HfApi
api = HfApi(token=token)
# Ensure the dataset repo exists (creates if needed)
try:
api.repo_info(repo_id=HF_DATASET_REPO, repo_type="dataset")
except Exception:
logger.info(f" Creating dataset repo '{HF_DATASET_REPO}' …")
api.create_repo(
repo_id=HF_DATASET_REPO,
repo_type="dataset",
private=False,
exist_ok=True,
)
uploads = [
(paths["embeddings"], f"{concept}/embeddings.npy"),
(paths["metadata_parquet"], f"{concept}/metadata.parquet"),
(paths["metadata_json"], f"{concept}/metadata_preview.json"),
]
for local_path, remote_path in uploads:
if not local_path.exists():
logger.warning(f" Skipping missing file: {local_path}")
continue
logger.info(f" Uploading {local_path.name}{remote_path} …")
api.upload_file(
path_or_fileobj=str(local_path),
path_in_repo=remote_path,
repo_id=HF_DATASET_REPO,
repo_type="dataset",
commit_message=(
f"[{concept}] update embeddings "
f"{datetime.now().isoformat(timespec='minutes')}"
),
)
logger.info(f" ✓ {remote_path}")
return f"✓ [{concept}] pushed to {HF_DATASET_REPO}"
except Exception as exc:
msg = f"✗ [{concept}] HF push failed: {exc}\n{traceback.format_exc()}"
logger.error(msg)
return msg
def _push_dataset_card(token: str, summary: dict) -> None:
"""Write/update a README.md dataset card on HF."""
try:
from huggingface_hub import HfApi
api = HfApi(token=token)
lines = [
"---",
"license: cc-by-4.0",
"language:",
"- de",
"tags:",
"- embeddings",
"- curriculum",
"- education",
"- german",
"- sentence-transformers",
"---",
"",
"# German Curriculum Concept Embeddings",
"",
"Sentence embeddings for three focus concepts from the German school "
"curriculum analysis project.",
"",
f"**Model:** `{MODEL_NAME}` ",
f"**Generated:** {datetime.now().isoformat(timespec='seconds')} ",
"",
"## Structure",
"",
"```",
"concept/",
" embeddings.npy # float32 (N, 768) L2-normalised",
" metadata.parquet # one row per excerpt, all CSV columns",
" metadata_preview.json # schema + first 5 rows",
"```",
"",
"## Concepts",
"",
]
for concept, info in summary.items():
lines.append(
f"### `{concept}` "
f"— {info['n']:,} excerpts · dim={info['dim']}"
)
lines += [
"",
"## Usage",
"",
"```python",
"import numpy as np",
"import pandas as pd",
"from huggingface_hub import hf_hub_download",
"",
"concept = 'evolution'",
"",
"emb_path = hf_hub_download(",
f' repo_id="{HF_DATASET_REPO}",',
' repo_type="dataset",',
' filename=f"{concept}/embeddings.npy")',
"",
"meta_path = hf_hub_download(",
f' repo_id="{HF_DATASET_REPO}",',
' repo_type="dataset",',
' filename=f"{concept}/metadata.parquet")',
"",
"embs = np.load(emb_path) # (N, 768)",
"meta = pd.read_parquet(meta_path) # N rows",
"```",
"",
"## Source",
"",
"Generated by the "
"[Concept Atlas Space]"
"(https://huggingface.co/spaces/deirdosh/curriculum_analysis_german).",
]
readme = "\n".join(lines)
api.upload_file(
path_or_fileobj=readme.encode("utf-8"),
path_in_repo="README.md",
repo_id=HF_DATASET_REPO,
repo_type="dataset",
commit_message="update dataset card",
)
logger.info(" Dataset card (README.md) updated.")
except Exception as exc:
logger.warning(f" Dataset card update failed: {exc}")
# =============================================================================
# PIPELINE STATE CACHE (lightweight JSON)
# =============================================================================
_META_KEY = "embedding_pipeline_meta_v1"
def _save_state() -> None:
state = {
"timestamp": _P.get("timestamp", ""),
"concepts_done": _P.get("concepts_done", []),
"concept_meta": _P.get("concept_meta", {}),
"hf_status": _P.get("hf_status", {}),
}
_save_json(state, _META_KEY)
def _load_state() -> bool:
state = _load_json(_META_KEY)
if not state:
return False
_P.update({
"timestamp": state.get("timestamp", ""),
"concepts_done": state.get("concepts_done", []),
"concept_meta": state.get("concept_meta", {}),
"hf_status": state.get("hf_status", {}),
})
logger.info(f" Prior run restored: {_P['concepts_done']} [{_P['timestamp']}]")
return True
# =============================================================================
# PIPELINE WORKER
# =============================================================================
def _pipeline_worker(token: str) -> None:
try:
# ── Load corpus ───────────────────────────────────────────────────────
logger.info("━" * 56)
logger.info("STEP 1/4 Loading corpus CSV")
logger.info("━" * 56)
df = _load_csv()
_P["df"] = df
concepts_done = list(_P.get("concepts_done", []))
concept_meta = dict(_P.get("concept_meta", {}))
hf_status = dict(_P.get("hf_status", {}))
# ── Per-concept embedding ─────────────────────────────────────────────
logger.info("━" * 56)
logger.info("STEP 2/4 Computing embeddings per concept")
logger.info("━" * 56)
for concept in FOCUS_CONCEPTS:
logger.info(f"\n[{concept.upper()}] ── filtering …")
sub = _filter_concept(df, concept)
n = len(sub)
if n < 5:
logger.warning(f" [{concept}] only {n} rows — skipping")
continue
texts = sub["text_excerpt"].tolist()
logger.info(f"[{concept.upper()}] ── embeddings ({n:,} texts) …")
embs = compute_embeddings(texts, concept)
logger.info(f"[{concept.upper()}] ── building artefact files …")
paths = _build_concept_artefacts(sub, embs, concept)
# store in-memory for status display
concept_meta[concept] = {
"n": n,
"dim": int(embs.shape[1]),
"fp": _text_fingerprint(texts),
"emb_path": str(paths["embeddings"]),
"meta_path": str(paths["metadata_parquet"]),
}
if concept not in concepts_done:
concepts_done.append(concept)
_P.update(dict(
concepts_done=concepts_done,
concept_meta=concept_meta,
))
_save_state() # checkpoint after each concept
# ── Push to HF ────────────────────────────────────────────────────────
logger.info("━" * 56)
logger.info("STEP 3/4 Pushing to HuggingFace dataset")
logger.info("━" * 56)
if not token:
logger.warning(
" HF_TOKEN not provided — skipping upload.\n"
" Set HF_TOKEN as a Space secret or pass it in the UI.")
else:
for concept in concepts_done:
logger.info(f"\n[{concept.upper()}] ── uploading …")
meta = concept_meta[concept]
paths_for_push = {
"embeddings": Path(meta["emb_path"]),
"metadata_parquet": Path(meta["meta_path"]),
"metadata_json": _json_path(
f"meta_preview_{concept}_{meta['n']}_{meta['fp']}"),
}
result = _push_concept_to_hf(concept, paths_for_push, token)
hf_status[concept] = result
logger.info(f" {result}")
# dataset card
logger.info("\nUpdating dataset README …")
_push_dataset_card(token, concept_meta)
_P.update(dict(hf_status=hf_status))
# ── Finalise ──────────────────────────────────────────────────────────
logger.info("━" * 56)
logger.info("STEP 4/4 Saving final state")
logger.info("━" * 56)
_P["timestamp"] = datetime.now().isoformat(timespec="seconds")
_save_state()
logger.info("\n" + "═" * 56)
logger.info("✓ PIPELINE COMPLETE")
logger.info(f" Timestamp : {_P['timestamp']}")
for concept in concepts_done:
m = concept_meta[concept]
logger.info(
f" {concept:12s}: {m['n']:,} texts · "
f"dim={m['dim']} · "
f"HF={hf_status.get(concept, 'not pushed')[:40]}")
logger.info("═" * 56)
except Exception:
logger.error(f"Pipeline error:\n{traceback.format_exc()}")
finally:
_PIPELINE_RUNNING.clear()
# =============================================================================
# PUBLIC API (called by Gradio buttons)
# =============================================================================
def launch_pipeline(hf_token: str) -> None:
"""Start the embedding pipeline in a background thread."""
global _PIPELINE_THREAD
if _PIPELINE_RUNNING.is_set():
logger.info("Pipeline already running — wait for it to finish.")
return
_PIPELINE_RUNNING.set()
logger.info("⏳ Pipeline launched …")
token = hf_token.strip() or os.environ.get("HF_TOKEN", "")
_PIPELINE_THREAD = threading.Thread(
target=_pipeline_worker,
args=(token,),
name="pipeline",
daemon=True,
)
_PIPELINE_THREAD.start()
def get_status_md() -> str:
"""Markdown summary card shown in the UI."""
if not _P.get("concepts_done"):
return "_No pipeline run yet — click **▶ Run** to start._"
lines = [
f"**Last run:** {_P.get('timestamp','—')}",
"",
"| Concept | N texts | Dim | Local cache | HF status |",
"|---|---|---|---|---|",
]
for concept in FOCUS_CONCEPTS:
meta = _P.get("concept_meta", {}).get(concept, {})
n = f"{meta.get('n', 0):,}" if meta else "—"
dim = str(meta.get("dim", "—"))
cached = "✓" if meta.get("emb_path") and Path(meta["emb_path"]).exists() else "—"
hfs = _P.get("hf_status", {}).get(concept, "—")[:50]
lines.append(f"| {concept.capitalize()} | {n} | {dim} | {cached} | {hfs} |")
# dataset link
lines += [
"",
f"**Dataset:** [{HF_DATASET_REPO}]"
f"(https://huggingface.co/datasets/{HF_DATASET_REPO})",
]
return "\n".join(lines)
def get_cache_inventory() -> str:
"""List all cached files."""
files = sorted(CACHE_DIR.glob("*")) + sorted(DATA_DIR.glob("*"))
if not files:
return "_Cache is empty._"
total = sum(f.stat().st_size for f in files if f.is_file())
lines = [f"### {len(files)} cached files ({total/1_048_576:.2f} MB total)", ""]
for f in files:
if f.is_file():
sz = f.stat().st_size / 1024
ts = datetime.fromtimestamp(f.stat().st_mtime).strftime("%m-%d %H:%M")
lines.append(f"- `{f.name}` — {sz:.1f} KB · {ts}")
return "\n".join(lines)
def download_concept_embeddings(concept: str) -> str | None:
"""Return path to the concept's .npy embedding file for gr.File."""
meta = _P.get("concept_meta", {}).get(concept, {})
p = Path(meta.get("emb_path", "")) if meta else Path("")
if p.exists():
return str(p)
# try to find by glob
candidates = sorted(CACHE_DIR.glob(f"emb_{concept}_*__*.npy"),
key=lambda x: x.stat().st_mtime, reverse=True)
return str(candidates[0]) if candidates else None
def download_concept_metadata(concept: str) -> str | None:
"""Return path to the concept's .parquet metadata file for gr.File."""
meta = _P.get("concept_meta", {}).get(concept, {})
p = Path(meta.get("meta_path", "")) if meta else Path("")
if p.exists():
return str(p)
candidates = sorted(CACHE_DIR.glob(f"meta_{concept}_*__*.parquet"),
key=lambda x: x.stat().st_mtime, reverse=True)
return str(candidates[0]) if candidates else None
def download_all_zip() -> str | None:
"""Bundle all concept artefacts into a single ZIP."""
import zipfile
files: list[Path] = []
for concept in FOCUS_CONCEPTS:
emb = download_concept_embeddings(concept)
mta = download_concept_metadata(concept)
if emb: files.append(Path(emb))
if mta: files.append(Path(mta))
# JSON preview
meta = _P.get("concept_meta", {}).get(concept, {})
if meta:
jp = _json_path(
f"meta_preview_{concept}_{meta.get('n',0)}_{meta.get('fp','')}")
if jp.exists():
files.append(jp)
if LOG_FILE.exists():
files.append(LOG_FILE)
if not files:
return None
out = CACHE_DIR / f"curriculum_embeddings_{datetime.now().strftime('%Y%m%d_%H%M%S')}.zip"
with zipfile.ZipFile(out, "w", zipfile.ZIP_DEFLATED) as zf:
for p in files:
zf.write(p, p.name)
logger.info(f" ZIP → {out.name} {out.stat().st_size//1024} KB")
return str(out)
# =============================================================================
# GRADIO UI
# =============================================================================
_HDR = """
<div style="
background: linear-gradient(135deg,#0F0B2D 0%,#1E1A56 40%,#0A2818 80%,#2D1200 100%);
padding:28px 34px 22px; border-radius:14px; margin-bottom:12px;
box-shadow:0 8px 32px rgba(0,0,0,.55), inset 0 1px 0 rgba(255,255,255,.07);
">
<h1 style="color:#fff;margin:0 0 7px;font-size:26px;font-weight:700;">
🔭 Concept Atlas — Embedding Generator
</h1>
<p style="color:rgba(255,255,255,.68);margin:0;font-size:13px;line-height:1.7;">
Generates <b>sentence embeddings</b> for
<b style="color:#C084FC">mensch</b> ·
<b style="color:#34D399">verhalten</b> ·
<b style="color:#FB923C">evolution</b>
and pushes them to a public HuggingFace dataset.
</p>
<p style="color:rgba(255,255,255,.35);margin:6px 0 0;font-size:11px;">
Model: paraphrase-multilingual-mpnet-base-v2 &nbsp;|&nbsp;
Dataset:
<a href="https://huggingface.co/datasets/deirdosh/curriculum_embeddings"
style="color:#C084FC;text-decoration:none;">
deirdosh/curriculum_embeddings
</a>
</p>
</div>
"""
_INSTRUCTIONS = """
### What this app does
1. Downloads the German curriculum corpus CSV (~35k excerpts)
2. Filters excerpts for each of the three focus concepts
3. Encodes every excerpt with `paraphrase-multilingual-mpnet-base-v2`
(768-dim, L2-normalised, float32)
4. Saves per-concept artefacts:
- `embeddings.npy` — shape `(N, 768)`
- `metadata.parquet` — all CSV columns + `row_id`, `concept`, `model`, timestamps
- `metadata_preview.json` — schema + first 5 rows
5. Pushes all artefacts to
`huggingface.co/datasets/deirdosh/curriculum_embeddings`
### How to use
| Step | Action |
|---|---|
| 1 | Paste your `HF_TOKEN` (needs *write* access to the dataset repo) |
| 2 | Click **▶ Run Pipeline** |
| 3 | Watch the live log — each concept takes ~10 min on CPU |
| 4 | Download individual files or the full ZIP below |
> All steps are individually cached. Re-running skips already-computed embeddings.
> The HF_TOKEN can also be set as a **Space secret** — leave the field blank if so.
"""
_CSS = """
body,.gradio-container{background:#0F1120!important;}
.gr-button-primary{background:#6D28D9!important;border-color:#6D28D9!important;color:#fff!important;}
.gr-button-primary:hover{background:#5B21B6!important;}
.gr-button-secondary{background:#1E293B!important;border-color:#334155!important;color:#94A3B8!important;}
.gr-button-secondary:hover{background:#334155!important;color:#E2E8F0!important;}
.gr-textbox textarea{background:#0F1120!important;color:#E2E8F0!important;
border-color:#2D3555!important;font-family:monospace!important;font-size:12px!important;}
label{color:#94A3B8!important;}
.gr-markdown{color:#CBD5E1!important;}
.gr-markdown h3{color:#A78BFA!important;}
.gr-markdown code{background:#1E293B!important;color:#C084FC!important;}
footer{display:none!important;}
"""
def build_ui() -> gr.Blocks:
with gr.Blocks(
title="Concept Atlas — Embeddings",
css=_CSS,
theme=gr.themes.Base(
primary_hue="violet", secondary_hue="emerald", neutral_hue="slate",
font=[gr.themes.GoogleFont("Inter"), "system-ui"],
),
) as demo:
gr.HTML(_HDR)
with gr.Tabs():
# ── 0: Run ───────────────────────────────────────────────────────
with gr.TabItem("🚀 Run Pipeline"):
gr.Markdown(_INSTRUCTIONS)
with gr.Row():
token_box = gr.Textbox(
label="HuggingFace token (write access)",
placeholder="hf_… (or leave blank if HF_TOKEN secret is set)",
type="password",
scale=3,
)
run_btn = gr.Button("▶ Run Pipeline", variant="primary", scale=1)
live_log = gr.Textbox(
label="Live pipeline log",
value=get_live_log,
interactive=False,
lines=28,
every=3,
)
run_btn.click(fn=launch_pipeline, inputs=token_box, outputs=None)
# ── 1: Status ────────────────────────────────────────────────────
with gr.TabItem("📋 Status"):
status_btn = gr.Button("Refresh status", variant="secondary")
status_md = gr.Markdown(value=get_status_md)
# also auto-refresh every 5 s
status_auto = gr.Markdown(value=get_status_md, every=5, visible=False)
status_btn.click(fn=get_status_md, outputs=status_md)
gr.Markdown("---")
cache_btn = gr.Button("Refresh cache inventory", variant="secondary")
cache_md = gr.Markdown()
cache_btn.click(fn=get_cache_inventory, outputs=cache_md)
# ── 2: Download ───────────────────────────────────────────────────
with gr.TabItem("⬇ Download"):
gr.Markdown(
"### Download artefacts\n"
"Files are generated by the pipeline and cached locally. "
"They are also available at "
f"[{HF_DATASET_REPO}]"
f"(https://huggingface.co/datasets/{HF_DATASET_REPO})."
)
# ── per-concept ───────────────────────────────────────────────
for concept, emoji, colour in [
("mensch", "🔵", "#C084FC"),
("verhalten", "🟢", "#34D399"),
("evolution", "🟠", "#FB923C"),
]:
gr.Markdown(
f"#### {emoji} `{concept}` "
f"<span style='color:{colour}'>embeddings</span>")
with gr.Row():
be = gr.Button(f"Download {concept} embeddings (.npy)",
variant="primary")
bm = gr.Button(f"Download {concept} metadata (.parquet)",
variant="secondary")
with gr.Row():
fe = gr.File(label=f"{concept}_embeddings.npy")
fm = gr.File(label=f"{concept}_metadata.parquet")
be.click(fn=lambda c=concept: download_concept_embeddings(c),
outputs=fe)
bm.click(fn=lambda c=concept: download_concept_metadata(c),
outputs=fm)
gr.Markdown("---")
gr.Markdown("#### 📦 Download everything")
with gr.Row():
zip_btn = gr.Button("Download all artefacts as ZIP",
variant="primary")
zip_file = gr.File(label="curriculum_embeddings_*.zip")
zip_btn.click(fn=download_all_zip, outputs=zip_file)
gr.Markdown(
"---\n"
"### Dataset usage\n"
"```python\n"
"import numpy as np\n"
"import pandas as pd\n"
"from huggingface_hub import hf_hub_download\n\n"
f'concept = "evolution"\n\n'
"emb_path = hf_hub_download(\n"
f' repo_id="{HF_DATASET_REPO}",\n'
' repo_type="dataset",\n'
' filename=f"{concept}/embeddings.npy")\n\n'
"meta_path = hf_hub_download(\n"
f' repo_id="{HF_DATASET_REPO}",\n'
' repo_type="dataset",\n'
' filename=f"{concept}/metadata.parquet")\n\n'
"embs = np.load(emb_path) # (N, 768) float32\n"
"meta = pd.read_parquet(meta_path) # N rows\n"
"```"
)
gr.HTML(
"<div style='text-align:center;padding:12px;font-size:11px;color:#475569;"
"border-top:1px solid #1E293B;margin-top:8px;'>"
"Concept Atlas · OpenEvo/CCS · "
f"<a href='https://huggingface.co/datasets/{HF_DATASET_REPO}'"
" style='color:#7C3AED;'>Dataset</a>"
"</div>"
)
return demo
# =============================================================================
# STARTUP — try to restore a prior run's state
# =============================================================================
def _startup() -> None:
logger.info("=" * 56)
logger.info("Concept Atlas — Embedding Generator — startup")
logger.info("=" * 56)
try:
ok = _load_state()
if ok:
logger.info(f" Prior state restored: {_P.get('concepts_done')}")
else:
logger.info(" No prior cache — fresh start.")
except Exception:
logger.warning(f" Startup restore error:\n{traceback.format_exc()}")
_startup()
# =============================================================================
if __name__ == "__main__":
demo = build_ui()
demo.launch(
server_name="0.0.0.0",
server_port=7860,
share=False,
show_error=True,
)