Spaces:
Running
Running
| #!/usr/bin/env python3 | |
| # -*- coding: utf-8 -*- | |
| """ | |
| HF Space app for browsing/searching a big SQLite corpus built by build_corpus_sqlite.py. | |
| Goal: | |
| - SIMPLE UI: type -> search -> pick -> open | |
| - Advanced knobs hidden unless you open "Advanced" | |
| - No "runs" UI (no run picking, no runs tab) | |
| What it does: | |
| - Loads corpus.sqlite (read-only) | |
| - FTS keyword search (chunks_fts) | |
| - Browse clusters across ALL runs (cluster_summary) | |
| - Open a uid -> show full text + context window (order_index +/- k within the same run_id) | |
| How it finds the DB: | |
| 1) If CORPUS_SQLITE_PATH is set, uses that | |
| 2) Else tries common local paths (./data/corpus.sqlite, ./dataset/corpus.sqlite, /data/corpus.sqlite, ./corpus.sqlite) | |
| 3) Else downloads from a dataset repo using huggingface_hub (set DATASET_REPO_ID and optional DATASET_FILENAME) | |
| Env vars you can set in the Space: | |
| - CORPUS_SQLITE_PATH : absolute/relative path to the sqlite file if it already exists in the container | |
| - DATASET_REPO_ID : like "yourname/your-dataset-repo" (repo_type=dataset) | |
| (also accepts a full HF URL; we'll extract repo_id) | |
| - DATASET_FILENAME : default "corpus.sqlite" | |
| - DB_LOCAL_DIR : default "./data" (where downloaded DB will be copied) | |
| Notes: | |
| - Opens sqlite in read-only mode | |
| - Uses thread-local sqlite connections (safer with Gradio) | |
| """ | |
| from __future__ import annotations | |
| import os | |
| import re | |
| import shutil | |
| import sqlite3 | |
| import threading | |
| import traceback | |
| from pathlib import Path | |
| from typing import Any, Dict, List, Optional, Tuple | |
| from urllib.parse import quote, urlparse | |
| import gradio as gr | |
| try: | |
| from huggingface_hub import hf_hub_download | |
| except Exception: | |
| hf_hub_download = None # type: ignore | |
| APP_VERSION = "2026-02-01_app_f" | |
| # ----------------------------- | |
| # Env helpers (strip hidden whitespace/newlines) | |
| # ----------------------------- | |
| def _clean_env_value(v: str) -> str: | |
| if v is None: | |
| return "" | |
| s = str(v) | |
| s = s.replace("\r", "").replace("\n", "").replace("\t", " ") | |
| s = s.strip() | |
| s = "".join(ch for ch in s if ch.isprintable()) | |
| return s | |
| def _env(name: str, default: str = "") -> str: | |
| v = os.environ.get(name) | |
| if v is None: | |
| return default | |
| vv = _clean_env_value(v) | |
| return vv if vv else default | |
| def _parse_dataset_ref(repo_like: str) -> Tuple[str, Optional[str]]: | |
| """ | |
| Accept either: | |
| - "user/repo" | |
| - "https://huggingface.co/datasets/user/repo/blob/main/corpus.sqlite" | |
| Returns: (repo_id, inferred_filename_or_None) | |
| """ | |
| s = _clean_env_value(repo_like) | |
| if not s: | |
| return "", None | |
| if s.startswith("http://") or s.startswith("https://"): | |
| u = urlparse(s) | |
| p = (u.path or "").strip("/") | |
| parts = p.split("/") | |
| if len(parts) >= 3 and parts[0] == "datasets": | |
| repo_id = f"{parts[1]}/{parts[2]}" | |
| inferred_file: Optional[str] = None | |
| if "blob" in parts: | |
| try: | |
| i = parts.index("blob") | |
| if i + 2 < len(parts): | |
| inferred_file = "/".join(parts[i + 2 :]) | |
| except Exception: | |
| inferred_file = None | |
| return repo_id, inferred_file | |
| if any(ch.isspace() for ch in s): | |
| s = "".join(s.split()) | |
| return s, None | |
| # ----------------------------- | |
| # Gradio compat shim (Dataframe args differ by version) | |
| # ----------------------------- | |
| _UNEXPECTED_KW_RE = re.compile(r"unexpected keyword argument '([^']+)'") | |
| def _df(**kwargs): | |
| """ | |
| Build gr.Dataframe in a way that survives Gradio version drift. | |
| If a kwarg isn't supported, drop it and retry. | |
| """ | |
| k = dict(kwargs) | |
| for _ in range(32): | |
| try: | |
| return gr.Dataframe(**k) | |
| except TypeError as e: | |
| msg = str(e) | |
| m = _UNEXPECTED_KW_RE.search(msg) | |
| if m: | |
| bad = m.group(1) | |
| if bad in k: | |
| k.pop(bad, None) | |
| continue | |
| dropped = False | |
| for bad in ("max_rows", "wrap"): | |
| if bad in k: | |
| k.pop(bad, None) | |
| dropped = True | |
| break | |
| if dropped: | |
| continue | |
| raise | |
| return gr.Dataframe(**k) | |
| # ----------------------------- | |
| # DB location / download | |
| # ----------------------------- | |
| def _candidate_paths() -> List[Path]: | |
| p0 = _env("CORPUS_SQLITE_PATH", "") | |
| cands = [ | |
| Path(p0).expanduser() if p0 else None, | |
| Path("./data/corpus.sqlite"), | |
| Path("./dataset/corpus.sqlite"), | |
| Path("/data/corpus.sqlite"), | |
| Path("./corpus.sqlite"), | |
| Path("./data/corpus.db"), | |
| Path("./dataset/corpus.db"), | |
| Path("/data/corpus.db"), | |
| ] | |
| out: List[Path] = [] | |
| for p in cands: | |
| if p is None: | |
| continue | |
| try: | |
| out.append(p.resolve()) | |
| except Exception: | |
| out.append(p) | |
| return out | |
| def ensure_db_file() -> Path: | |
| for p in _candidate_paths(): | |
| if p.exists() and p.is_file(): | |
| print(f"[db] using local file: {p}") | |
| return p | |
| ds_repo_raw = _env("DATASET_REPO_ID", "") | |
| ds_repo, inferred_file = _parse_dataset_ref(ds_repo_raw) | |
| ds_file_raw = _env("DATASET_FILENAME", "corpus.sqlite") | |
| ds_file = _clean_env_value(ds_file_raw) | |
| if inferred_file and (not os.environ.get("DATASET_FILENAME") or not ds_file): | |
| ds_file = inferred_file | |
| ds_repo = _clean_env_value(ds_repo) | |
| ds_file = _clean_env_value(ds_file) | |
| local_dir = Path(_env("DB_LOCAL_DIR", "./data")).expanduser().resolve() | |
| local_dir.mkdir(parents=True, exist_ok=True) | |
| target = (local_dir / (ds_file if ds_file else "corpus.sqlite")).resolve() | |
| print(f"[db] DATASET_REPO_ID={ds_repo!r}") | |
| print(f"[db] DATASET_FILENAME={ds_file!r}") | |
| print(f"[db] DB_LOCAL_DIR={str(local_dir)!r}") | |
| if ds_repo: | |
| if hf_hub_download is None: | |
| raise RuntimeError("DATASET_REPO_ID is set, but huggingface_hub is not installed. Add it to requirements.txt.") | |
| if not ds_file: | |
| ds_file = "corpus.sqlite" | |
| cached_path = hf_hub_download( | |
| repo_id=ds_repo, | |
| filename=ds_file, | |
| repo_type="dataset", | |
| ) | |
| cached_path = str(cached_path) | |
| try: | |
| src = Path(cached_path).resolve() | |
| if target.exists(): | |
| try: | |
| if target.stat().st_size == src.stat().st_size: | |
| print(f"[db] target already present (same size), using: {target}") | |
| return target | |
| except Exception: | |
| pass | |
| shutil.copy2(str(src), str(target)) | |
| print(f"[db] downloaded -> {target}") | |
| return target | |
| except Exception as e: | |
| print(f"[db] copy to local_dir failed, using cache path instead: {cached_path} ({e})") | |
| return Path(cached_path).resolve() | |
| raise RuntimeError( | |
| "Could not find corpus sqlite file.\n" | |
| "Fix: set CORPUS_SQLITE_PATH or set DATASET_REPO_ID (and make sure the dataset has corpus.sqlite)." | |
| ) | |
| DB_PATH = ensure_db_file() | |
| # ----------------------------- | |
| # SQLite connection (thread-local) | |
| # ----------------------------- | |
| _tls = threading.local() | |
| def _connect_readonly(db_path: Path) -> sqlite3.Connection: | |
| uri_path = quote(db_path.as_posix(), safe="/:") | |
| uri = f"file:{uri_path}?mode=ro" | |
| conn = sqlite3.connect(uri, uri=True, check_same_thread=False) | |
| conn.row_factory = sqlite3.Row | |
| try: | |
| conn.execute("PRAGMA query_only=ON;") | |
| except Exception: | |
| pass | |
| try: | |
| conn.execute("PRAGMA temp_store=MEMORY;") | |
| except Exception: | |
| pass | |
| try: | |
| conn.execute("PRAGMA cache_size=-100000;") | |
| except Exception: | |
| pass | |
| return conn | |
| def get_conn() -> sqlite3.Connection: | |
| c = getattr(_tls, "conn", None) | |
| if c is None: | |
| _tls.conn = _connect_readonly(DB_PATH) | |
| c = _tls.conn | |
| return c | |
| # ----------------------------- | |
| # Query helpers | |
| # ----------------------------- | |
| def table_exists(conn: sqlite3.Connection, name: str) -> bool: | |
| cur = conn.cursor() | |
| cur.execute("SELECT 1 FROM sqlite_master WHERE type IN ('table','view') AND name=? LIMIT 1;", (name,)) | |
| ok = cur.fetchone() is not None | |
| cur.close() | |
| return ok | |
| def normalize_fts_query(q: str) -> str: | |
| q = (q or "").strip() | |
| if not q: | |
| return "" | |
| ops = ['"', "*", " OR ", " AND ", " NOT ", " NEAR", "(", ")", ":"] | |
| q_up = f" {q.upper()} " | |
| if any(op in q for op in ops) or any(op in q_up for op in ops): | |
| return q | |
| toks = [] | |
| for t in q.replace("\n", " ").replace("\t", " ").split(" "): | |
| t = t.strip() | |
| if not t: | |
| continue | |
| t = t.strip(".,;!?[]{}<>") | |
| if t: | |
| toks.append(t) | |
| if not toks: | |
| return q | |
| return " AND ".join(toks) | |
| def fetch_meta() -> List[List[Any]]: | |
| conn = get_conn() | |
| cur = conn.cursor() | |
| cur.execute("SELECT k, v FROM meta ORDER BY k;") | |
| rows = cur.fetchall() | |
| cur.close() | |
| out = [["k", "v"]] | |
| for r in rows: | |
| out.append([r["k"], r["v"]]) | |
| return out | |
| def fts_search(query: str, cluster_id: str, limit: int) -> List[List[Any]]: | |
| conn = get_conn() | |
| if not table_exists(conn, "chunks_fts"): | |
| return [["error"], ["FTS table (chunks_fts) not found in this DB."]] | |
| qn = normalize_fts_query(query) | |
| if not qn: | |
| return [["error"], ["empty query"]] | |
| cluster_id = (cluster_id or "").strip() | |
| limit = int(limit) if limit else 50 | |
| limit = max(1, min(500, limit)) | |
| where = ["(chunks_fts MATCH ?)"] | |
| params: List[Any] = [qn] | |
| if cluster_id: | |
| where.append("c.cluster_id = ?") | |
| try: | |
| params.append(int(float(cluster_id))) | |
| except Exception: | |
| return [["error"], [f"cluster_id must be an int (got {cluster_id!r})"]] | |
| where_sql = " AND ".join(where) | |
| sql_bm25 = f""" | |
| SELECT | |
| c.uid, | |
| c.cluster_id, | |
| c.order_index, | |
| c.doc_id, | |
| c.source_file, | |
| c.cluster_prob, | |
| CASE | |
| WHEN length(c.text) > 220 THEN substr(c.text, 1, 220) || 'β¦' | |
| ELSE c.text | |
| END AS preview | |
| FROM chunks_fts | |
| JOIN chunks c ON c.uid = chunks_fts.uid | |
| WHERE {where_sql} | |
| ORDER BY bm25(chunks_fts) | |
| LIMIT ?; | |
| """ | |
| sql_fallback = f""" | |
| SELECT | |
| c.uid, | |
| c.cluster_id, | |
| c.order_index, | |
| c.doc_id, | |
| c.source_file, | |
| c.cluster_prob, | |
| CASE | |
| WHEN length(c.text) > 220 THEN substr(c.text, 1, 220) || 'β¦' | |
| ELSE c.text | |
| END AS preview | |
| FROM chunks_fts | |
| JOIN chunks c ON c.uid = chunks_fts.uid | |
| WHERE {where_sql} | |
| LIMIT ?; | |
| """ | |
| params2 = params + [limit] | |
| cur = conn.cursor() | |
| headers = ["uid", "cluster_id", "order_index", "doc_id", "source_file", "cluster_prob", "preview"] | |
| out = [headers] | |
| try: | |
| cur.execute(sql_bm25, params2) | |
| except Exception: | |
| cur.execute(sql_fallback, params2) | |
| rows = cur.fetchall() | |
| cur.close() | |
| for r in rows: | |
| out.append([r["uid"], r["cluster_id"], r["order_index"], r["doc_id"], r["source_file"], r["cluster_prob"], r["preview"]]) | |
| return out | |
| def get_chunk_by_uid(uid: str) -> Optional[Dict[str, Any]]: | |
| conn = get_conn() | |
| cur = conn.cursor() | |
| cur.execute( | |
| """ | |
| SELECT uid, run_id, chunk_id, order_index, doc_id, source_file, cluster_id, cluster_prob, bm25_density, | |
| idf_mass, token_count, unique_token_count, text | |
| FROM chunks | |
| WHERE uid=? | |
| LIMIT 1; | |
| """, | |
| (uid,), | |
| ) | |
| r = cur.fetchone() | |
| cur.close() | |
| if not r: | |
| return None | |
| return dict(r) | |
| def get_context(run_id: str, order_index: int, window: int) -> List[Dict[str, Any]]: | |
| conn = get_conn() | |
| lo = int(order_index) - int(window) | |
| hi = int(order_index) + int(window) | |
| cur = conn.cursor() | |
| cur.execute( | |
| """ | |
| SELECT uid, order_index, doc_id, source_file, cluster_id, cluster_prob, | |
| CASE WHEN length(text) > 220 THEN substr(text, 1, 220) || 'β¦' ELSE text END AS preview | |
| FROM chunks | |
| WHERE run_id=? AND order_index BETWEEN ? AND ? | |
| ORDER BY order_index; | |
| """, | |
| (run_id, lo, hi), | |
| ) | |
| rows = cur.fetchall() | |
| cur.close() | |
| return [dict(x) for x in rows] | |
| def fetch_cluster_summary_all(top_n: int) -> List[List[Any]]: | |
| conn = get_conn() | |
| if not table_exists(conn, "cluster_summary"): | |
| return [["error"], ["cluster_summary not found in this DB."]] | |
| top_n = int(top_n) if top_n else 200 | |
| top_n = max(1, min(2000, top_n)) | |
| cur = conn.cursor() | |
| cur.execute( | |
| """ | |
| SELECT run_id, cluster_id, n_chunks, prob_avg, bm25_density_avg, idf_mass_avg, token_count_avg | |
| FROM cluster_summary | |
| ORDER BY n_chunks DESC | |
| LIMIT ?; | |
| """, | |
| (top_n,), | |
| ) | |
| rows = cur.fetchall() | |
| cur.close() | |
| out = [["run_id", "cluster_id", "n_chunks", "prob_avg", "bm25_density_avg", "idf_mass_avg", "token_count_avg"]] | |
| for r in rows: | |
| out.append([r["run_id"], r["cluster_id"], r["n_chunks"], r["prob_avg"], r["bm25_density_avg"], r["idf_mass_avg"], r["token_count_avg"]]) | |
| return out | |
| def fetch_cluster_chunks(run_id: str, cluster_id: str, limit: int) -> List[List[Any]]: | |
| conn = get_conn() | |
| run_id = (run_id or "").strip() | |
| cluster_id = (cluster_id or "").strip() | |
| if not run_id: | |
| return [["error"], ["missing run_id for this cluster"]] | |
| if not cluster_id: | |
| return [["error"], ["missing cluster_id"]] | |
| try: | |
| cid = int(float(cluster_id)) | |
| except Exception: | |
| return [["error"], [f"cluster_id must be int (got {cluster_id!r})"]] | |
| limit = int(limit) if limit else 150 | |
| limit = max(1, min(2000, limit)) | |
| cur = conn.cursor() | |
| cur.execute( | |
| """ | |
| SELECT uid, order_index, doc_id, source_file, cluster_prob, | |
| CASE WHEN length(text) > 220 THEN substr(text, 1, 220) || 'β¦' ELSE text END AS preview | |
| FROM chunks | |
| WHERE run_id=? AND cluster_id=? | |
| ORDER BY cluster_prob DESC, order_index ASC | |
| LIMIT ?; | |
| """, | |
| (run_id, cid, limit), | |
| ) | |
| rows = cur.fetchall() | |
| cur.close() | |
| out = [["uid", "order_index", "doc_id", "source_file", "cluster_prob", "preview"]] | |
| for r in rows: | |
| out.append([r["uid"], r["order_index"], r["doc_id"], r["source_file"], r["cluster_prob"], r["preview"]]) | |
| return out | |
| # ----------------------------- | |
| # UI helpers | |
| # ----------------------------- | |
| def _fmt_debug(e: BaseException) -> str: | |
| tb = traceback.format_exc() | |
| if len(tb) > 6000: | |
| tb = tb[-6000:] | |
| return f"```text\n{tb}\n```" | |
| def _blank_results_table() -> List[List[Any]]: | |
| return [["uid", "cluster_id", "order_index", "doc_id", "source_file", "cluster_prob", "preview"]] | |
| def _blank_cluster_table() -> List[List[Any]]: | |
| return [["run_id", "cluster_id", "n_chunks", "prob_avg", "bm25_density_avg", "idf_mass_avg", "token_count_avg"]] | |
| def _blank_cluster_chunks_table() -> List[List[Any]]: | |
| return [["uid", "order_index", "doc_id", "source_file", "cluster_prob", "preview"]] | |
| def _blank_ctx_table() -> List[List[Any]]: | |
| return [["uid", "order_index", "cluster_id", "cluster_prob", "doc_id", "source_file", "preview"]] | |
| def _pack_choice(uid: str, preview: str) -> str: | |
| uid = (uid or "").strip() | |
| preview = (preview or "").replace("\n", " ").replace("\r", " ").strip() | |
| preview = re.sub(r"\s+", " ", preview) | |
| if len(preview) > 160: | |
| preview = preview[:160] + "β¦" | |
| return f"{uid} | {preview}" if preview else uid | |
| def _extract_uid(choice: str) -> str: | |
| s = (choice or "").strip() | |
| if not s: | |
| return "" | |
| if " | " in s: | |
| return s.split(" | ", 1)[0].strip() | |
| return s | |
| def _pack_cluster_choice(run_id: str, cluster_id: Any, n_chunks: Any) -> str: | |
| r = (str(run_id) if run_id is not None else "").strip() | |
| c = (str(cluster_id) if cluster_id is not None else "").strip() | |
| try: | |
| n = int(n_chunks) | |
| except Exception: | |
| n = n_chunks | |
| # user sees this; keep it readable and stable | |
| return f"{r} / {c} | {n}" | |
| def _extract_cluster_key(choice: str) -> Tuple[str, str]: | |
| """ | |
| choice format: "run_id / cluster_id | n" | |
| """ | |
| s = (choice or "").strip() | |
| if not s: | |
| return "", "" | |
| left = s.split(" | ", 1)[0].strip() | |
| if " / " in left: | |
| a, b = left.split(" / ", 1) | |
| return a.strip(), b.strip() | |
| # fallback: if someone pasted just a cluster_id | |
| return "", left.strip() | |
| def _show_uid(uid: str, window: int) -> Tuple[str, str, List[List[Any]]]: | |
| uid = (uid or "").strip() | |
| if not uid: | |
| return "", "", _blank_ctx_table() | |
| ch = get_chunk_by_uid(uid) | |
| if not ch: | |
| return "", "", _blank_ctx_table() | |
| meta_lines = [ | |
| f"uid: {ch.get('uid','')}", | |
| f"run_id: {ch.get('run_id','')}", | |
| f"chunk_id: {ch.get('chunk_id','')}", | |
| f"order_index: {ch.get('order_index','')}", | |
| f"doc_id: {ch.get('doc_id','')}", | |
| f"source_file: {ch.get('source_file','')}", | |
| f"cluster_id: {ch.get('cluster_id','')}", | |
| f"cluster_prob: {ch.get('cluster_prob','')}", | |
| f"bm25_density: {ch.get('bm25_density','')}", | |
| f"idf_mass: {ch.get('idf_mass','')}", | |
| f"token_count: {ch.get('token_count','')}", | |
| f"unique_token_count: {ch.get('unique_token_count','')}", | |
| ] | |
| meta_text = "\n".join(meta_lines) | |
| full_text = ch.get("text", "") or "" | |
| if len(full_text) > 20000: | |
| full_text = full_text[:20000] + "\n\nβ¦(truncated to 20k chars)β¦" | |
| ctx = get_context(run_id=str(ch["run_id"]), order_index=int(ch["order_index"] or 0), window=int(window or 3)) | |
| ctx_table = _blank_ctx_table() | |
| for r in ctx: | |
| ctx_table.append( | |
| [ | |
| r.get("uid", ""), | |
| r.get("order_index", ""), | |
| r.get("cluster_id", ""), | |
| r.get("cluster_prob", ""), | |
| r.get("doc_id", ""), | |
| r.get("source_file", ""), | |
| r.get("preview", ""), | |
| ] | |
| ) | |
| return meta_text, full_text, ctx_table | |
| # ----------------------------- | |
| # Callbacks | |
| # ----------------------------- | |
| def ui_search(query: str, limit: int, cluster_id: str): | |
| try: | |
| tbl = fts_search(query=query, cluster_id=cluster_id, limit=limit) | |
| if tbl and tbl[0] and tbl[0][0] == "error": | |
| return ( | |
| gr.update(choices=[], value=""), | |
| "", | |
| tbl, | |
| "β οΈ " + (tbl[1][0] if len(tbl) > 1 and tbl[1] else "Search error"), | |
| _fmt_debug(RuntimeError("search error")), | |
| ) | |
| choices: List[str] = [] | |
| if len(tbl) >= 2: | |
| for row in tbl[1:]: | |
| if not row or len(row) < 7: | |
| continue | |
| uid = str(row[0]) | |
| preview = str(row[6]) | |
| choices.append(_pack_choice(uid, preview)) | |
| status = f"β Found {len(choices)} results." | |
| debug = "" | |
| first_uid = _extract_uid(choices[0]) if choices else "" | |
| return ( | |
| gr.update(choices=choices, value=(choices[0] if choices else "")), | |
| first_uid, | |
| tbl, | |
| status, | |
| debug, | |
| ) | |
| except Exception as e: | |
| return ( | |
| gr.update(choices=[], value=""), | |
| "", | |
| _blank_results_table(), | |
| f"β οΈ {type(e).__name__}: {e}", | |
| _fmt_debug(e), | |
| ) | |
| def ui_pick_result(choice: str): | |
| return _extract_uid(choice) | |
| def ui_open_uid(uid: str, ctx_window: int): | |
| try: | |
| uid = (uid or "").strip() | |
| if not uid: | |
| return "", "", _blank_ctx_table(), "β οΈ Enter/pick a uid first.", "" | |
| meta, text, ctx = _show_uid(uid, ctx_window) | |
| if not meta and not text: | |
| return "", "", _blank_ctx_table(), f"β οΈ uid not found: {uid}", "" | |
| return meta, text, ctx, f"β Opened uid {uid}", "" | |
| except Exception as e: | |
| return "", "", _blank_ctx_table(), f"β οΈ {type(e).__name__}: {e}", _fmt_debug(e) | |
| def ui_load_clusters_all(top_n: int): | |
| try: | |
| tbl = fetch_cluster_summary_all(top_n=top_n) | |
| if tbl and tbl[0] and tbl[0][0] == "error": | |
| return tbl, gr.update(choices=[], value=""), "β οΈ " + (tbl[1][0] if len(tbl) > 1 and tbl[1] else "Cluster summary error"), "" | |
| choices: List[str] = [] | |
| if len(tbl) >= 2: | |
| for row in tbl[1:]: | |
| if not row or len(row) < 3: | |
| continue | |
| choices.append(_pack_cluster_choice(str(row[0]), row[1], row[2])) | |
| status = f"β Loaded {len(choices)} clusters." | |
| return tbl, gr.update(choices=choices, value=(choices[0] if choices else "")), status, "" | |
| except Exception as e: | |
| return _blank_cluster_table(), gr.update(choices=[], value=""), f"β οΈ {type(e).__name__}: {e}", _fmt_debug(e) | |
| def ui_load_cluster_chunks(cluster_choice: str, limit: int): | |
| try: | |
| run_id, cluster_id = _extract_cluster_key(cluster_choice) | |
| if not run_id: | |
| return ( | |
| _blank_cluster_chunks_table(), | |
| gr.update(choices=[], value=""), | |
| "", | |
| "β οΈ Pick a cluster from the list.", | |
| "", | |
| ) | |
| tbl = fetch_cluster_chunks(run_id=run_id, cluster_id=cluster_id, limit=limit) | |
| if tbl and tbl[0] and tbl[0][0] == "error": | |
| return ( | |
| tbl, | |
| gr.update(choices=[], value=""), | |
| "", | |
| "β οΈ " + (tbl[1][0] if len(tbl) > 1 and tbl[1] else "Cluster error"), | |
| "", | |
| ) | |
| choices: List[str] = [] | |
| if len(tbl) >= 2: | |
| for row in tbl[1:]: | |
| if not row or len(row) < 6: | |
| continue | |
| uid = str(row[0]) | |
| preview = str(row[5]) | |
| choices.append(_pack_choice(uid, preview)) | |
| first_uid = _extract_uid(choices[0]) if choices else "" | |
| return ( | |
| tbl, | |
| gr.update(choices=choices, value=(choices[0] if choices else "")), | |
| first_uid, | |
| f"β Loaded {len(choices)} chunks.", | |
| "", | |
| ) | |
| except Exception as e: | |
| return _blank_cluster_chunks_table(), gr.update(choices=[], value=""), "", f"β οΈ {type(e).__name__}: {e}", _fmt_debug(e) | |
| def ui_reload_meta(): | |
| try: | |
| meta_table = fetch_meta() | |
| return meta_table, "β Reloaded.", "" | |
| except Exception as e: | |
| return [["error"], ["failed"]], f"β οΈ {type(e).__name__}: {e}", _fmt_debug(e) | |
| # ----------------------------- | |
| # UI build | |
| # ----------------------------- | |
| CSS = """ | |
| #app { max-width: 1100px; margin: 0 auto; } | |
| h1,h2,h3 { margin-bottom: 0.4rem; } | |
| .note { font-size: 0.95rem; opacity: 0.9; } | |
| """ | |
| def build_ui() -> gr.Blocks: | |
| meta_table = fetch_meta() | |
| with gr.Blocks(title="Corpus Browser", css=CSS) as demo: | |
| gr.Markdown( | |
| f""" | |
| # Corpus Browser | |
| <span class="note">version: <code>{APP_VERSION}</code> β db: <code>{DB_PATH}</code></span> | |
| **Use it like this:** | |
| - **Search:** type words β Search β pick result β Open | |
| - **Clusters:** Load clusters β pick one β Load chunks β pick chunk β Open | |
| """ | |
| ) | |
| status = gr.Markdown("Ready.", elem_id="status") | |
| with gr.Accordion("Debug details", open=False): | |
| debug = gr.Markdown("") | |
| with gr.Tab("Search"): | |
| with gr.Row(): | |
| q = gr.Textbox(label="Search words", placeholder="Type words to search", lines=2) | |
| search_btn = gr.Button("Search", variant="primary") | |
| with gr.Accordion("Advanced", open=False): | |
| with gr.Row(): | |
| limit_in = gr.Slider(1, 500, value=50, step=1, label="Max results") | |
| cluster_in = gr.Textbox(label="Filter by cluster_id (optional)", placeholder="Leave blank") | |
| ctx_window = gr.Slider(0, 12, value=3, step=1, label="Context window") | |
| gr.Markdown("### Results") | |
| result_pick = gr.Dropdown(choices=[], value="", label="Pick a result", interactive=True) | |
| uid_box = gr.Textbox(label="UID", placeholder="Auto-filled when you pick a result (or paste one)") | |
| open_btn = gr.Button("Open", variant="secondary") | |
| with gr.Row(): | |
| text_out = gr.Textbox(label="Text", lines=18) | |
| with gr.Accordion("More details", open=False): | |
| meta_out = gr.Textbox(label="Meta", lines=10) | |
| ctx_tbl = _df(value=_blank_ctx_table(), label="Nearby chunks (context)", interactive=False, wrap=True) | |
| with gr.Accordion("Show table (power users)", open=False): | |
| results_tbl = _df(value=_blank_results_table(), label="Raw results table", interactive=False, wrap=True) | |
| search_btn.click( | |
| ui_search, | |
| inputs=[q, limit_in, cluster_in], | |
| outputs=[result_pick, uid_box, results_tbl, status, debug], | |
| ) | |
| result_pick.change(ui_pick_result, inputs=[result_pick], outputs=[uid_box]) | |
| open_btn.click( | |
| ui_open_uid, | |
| inputs=[uid_box, ctx_window], | |
| outputs=[meta_out, text_out, ctx_tbl, status, debug], | |
| ) | |
| with gr.Tab("Clusters"): | |
| with gr.Row(): | |
| load_clusters_btn = gr.Button("Load clusters", variant="primary") | |
| with gr.Accordion("Advanced", open=False): | |
| with gr.Row(): | |
| topn = gr.Slider(1, 2000, value=200, step=1, label="How many clusters to list") | |
| sample_n = gr.Slider(1, 2000, value=150, step=1, label="How many chunks to list") | |
| ctx_window2 = gr.Slider(0, 12, value=3, step=1, label="Context window") | |
| cluster_pick = gr.Dropdown(choices=[], value="", label="Pick a cluster", interactive=True) | |
| load_chunks_btn = gr.Button("Load chunks", variant="secondary") | |
| chunk_pick = gr.Dropdown(choices=[], value="", label="Pick a chunk", interactive=True) | |
| uid_box2 = gr.Textbox(label="UID", placeholder="Auto-filled when you pick a chunk (or paste one)") | |
| open_btn2 = gr.Button("Open", variant="secondary") | |
| with gr.Accordion("Show tables", open=False): | |
| cluster_tbl = _df(value=_blank_cluster_table(), label="Clusters table", interactive=False, wrap=True) | |
| chunk_tbl = _df(value=_blank_cluster_chunks_table(), label="Chunks table", interactive=False, wrap=True) | |
| with gr.Row(): | |
| text_out2 = gr.Textbox(label="Text", lines=18) | |
| with gr.Accordion("More details", open=False): | |
| meta_out2 = gr.Textbox(label="Meta", lines=10) | |
| ctx_tbl2 = _df(value=_blank_ctx_table(), label="Nearby chunks (context)", interactive=False, wrap=True) | |
| load_clusters_btn.click( | |
| ui_load_clusters_all, | |
| inputs=[topn], | |
| outputs=[cluster_tbl, cluster_pick, status, debug], | |
| ) | |
| load_chunks_btn.click( | |
| ui_load_cluster_chunks, | |
| inputs=[cluster_pick, sample_n], | |
| outputs=[chunk_tbl, chunk_pick, uid_box2, status, debug], | |
| ) | |
| chunk_pick.change(ui_pick_result, inputs=[chunk_pick], outputs=[uid_box2]) | |
| open_btn2.click( | |
| ui_open_uid, | |
| inputs=[uid_box2, ctx_window2], | |
| outputs=[meta_out2, text_out2, ctx_tbl2, status, debug], | |
| ) | |
| with gr.Tab("About"): | |
| reload_meta_btn = gr.Button("Reload meta", variant="primary") | |
| meta_tbl = _df(value=meta_table, label="Meta", interactive=False, wrap=True) | |
| reload_meta_btn.click( | |
| ui_reload_meta, | |
| inputs=[], | |
| outputs=[meta_tbl, status, debug], | |
| ) | |
| return demo | |
| demo = build_ui() | |
| if __name__ == "__main__": | |
| demo.launch(server_name="0.0.0.0", server_port=int(_env("PORT", "7860"))) | |