Spaces:
Running
Running
| """ProcGrep explorer backend — query full agent-trajectory datasets, live. | |
| Intent: a small FastAPI service that ingests a Hugging Face trajectory dataset | |
| through procgrep, caches the canonicalized traces, and answers structural | |
| queries (for example "an edit streak of five or more with no test") over the | |
| whole dataset rather than a fixed 500-trace sample. Read this when changing the | |
| explorer's server behavior or its caching. | |
| Design decisions (benefit / price): | |
| 1. Import procgrep; do not reimplement canonicalization or queries. | |
| Benefit: improvements to the library flow here for free, and the static | |
| essay and this live backend share one definition of a "procedure". | |
| Price: the Space pins a procgrep revision and must be rebuilt to pick up | |
| library changes. | |
| 2. Cache canonicalized traces per dataset in memory under a small LRU. | |
| Benefit: the expensive step (ingest plus canonicalize) runs once, after | |
| which a query is a microsecond string scan. | |
| Price: the first query on a cold dataset pays the full ingest cost, and | |
| memory grows with the number of cached datasets (bounded by MAX_DATASETS). | |
| 3. Bound each ingest to MAX_TRACES with a timeout. | |
| Benefit: predictable latency and memory on the free CPU tier. | |
| Price: a very large dataset is sampled to the cap, not scanned in full; | |
| the response reports when this happens. | |
| 4. A query is a regular expression over the canonical atom spine, matching the | |
| static essay's query box exactly. | |
| Benefit: one query language across the paper and the live demo. | |
| Price: the spine drops argument-level detail, by design in procgrep. | |
| 5. Prefer a precomputed spine store (HF dataset midah/procgrep-spines) over | |
| live ingest, falling back to live ingest when the store is missing a dataset | |
| or cannot be reached. | |
| Benefit: warm datasets answer instantly with no per-query streaming or | |
| canonicalization, and coverage can grow on CI rather than at request time. | |
| Price: store-backed datasets are only as fresh as the last refresh build; | |
| the weekly action keeps them current. | |
| 6. The comparator (/compare) reuses the library's lineage primitives | |
| (fit_bpe, encode, discriminative_procedures, jsd) rather than reinventing | |
| them, and bounds each side to CMP_SAMPLE traces for the BPE/diff pass. | |
| Benefit: the side-by-side diff means exactly what the paper's lineage_diff | |
| means, and the heavy step stays bounded for interactive latency. | |
| Price: the discriminative-procedure ranking sees a sample, not all traces, | |
| on very large groups; the rendered trail stack is a further CMP_STACK cap. | |
| 7. Cache /compare results by their key, and pre-warm the default landing pair | |
| in a background thread at startup. | |
| Benefit: the comparator's landing view answers instantly instead of paying | |
| the first BPE pass live; repeat toggles are free. | |
| Price: a bounded amount of memory for the cache, and the warm-up does one | |
| BPE pass at startup off the request path. | |
| """ | |
| from __future__ import annotations | |
| import contextlib | |
| import math | |
| import re | |
| import threading | |
| import time | |
| from collections import Counter, OrderedDict | |
| from dataclasses import dataclass | |
| from pathlib import Path | |
| from statistics import median | |
| from fastapi import FastAPI | |
| from fastapi.middleware.cors import CORSMiddleware | |
| from fastapi.responses import FileResponse, JSONResponse | |
| from fastapi.staticfiles import StaticFiles | |
| from pydantic import BaseModel | |
| from procgrep import ( | |
| Trace, | |
| discriminative_procedures, | |
| encode, | |
| fit_bpe, | |
| jsd, | |
| ) | |
| from procgrep.ingest import ingest | |
| from procgrep.types import PROCEDURE_SEPARATOR | |
| # Configuration. | |
| MAX_TRACES = 20000 # per-dataset ingest cap (design decision 3); raise as memory allows | |
| MAX_DATASETS = 6 # cached datasets before LRU eviction (design decision 2) | |
| INGEST_TIMEOUT_S = 60.0 | |
| HIT_SAMPLE = 50 # matched traces returned to the client | |
| STATIC = Path(__file__).parent / "static" | |
| SPINE_REPO = "midah/procgrep-spines" # precomputed store (design decision 5) | |
| SPINE_FILE = "procgrep_spines.parquet" | |
| CMP_SAMPLE = 1500 # traces per side fed to BPE + discriminative_procedures (design decision 6) | |
| CMP_STACK = 30 # trails rendered per side in the comparator | |
| CMP_VOCAB = 200 # BPE procedure-vocabulary size for the diff | |
| CMP_TRAIL_CAP = 140 # atoms per rendered trail (fits the column at 3px/cell) | |
| COMPARE_CACHE_MAX = 24 # cached /compare results before LRU eviction (design decision 7) | |
| # A short, curated starting set; the client may query any dataset id. | |
| SUGGESTED = ( | |
| "nebius/SWE-agent-trajectories", | |
| "ElenaFu/SWE-agent-trajectories", | |
| "nebius/SWE-rebench-openhands-trajectories", | |
| "SWE-bench/SWE-smith-trajectories", | |
| "nvidia/Nemotron-SFT-Agentic-v2", | |
| "AlienKevin/SWE-ZERO-12M-trajectories", | |
| ) | |
| # Canonical atom palette (kept in sync with the static explorer). | |
| ATOM_COLOR = { | |
| "search_repo": "#585E53", | |
| "read_file": "#5692E5", | |
| "edit": "#CB4D20", | |
| "create_file": "#3D7AD8", | |
| "run_test": "#20A380", | |
| "submit": "#14110E", | |
| "think": "#b7b1a7", | |
| "localize": "#8C1040", | |
| "delete_file": "#A03D18", | |
| "error": "#B4184F", | |
| "other": "#d9d4cc", | |
| } | |
| class CachedTrace: | |
| """One canonicalized trajectory, with its atom spine pre-joined for scans.""" | |
| trace_id: str | |
| agent: str | |
| atoms: tuple[str, ...] | |
| spine: str # " ".join(atoms) + " ", so `(edit ){5,}` style regexes match | |
| task: str = "" # the task/instance the trajectory solved (trace.group), if known | |
| outcome: str = "" # "resolved"/"unresolved"/"" from a genuine resolution label, when present | |
| cot_tokens: int = 0 # approx tokens the model generated (reasoning volume), chars/4 | |
| # dataset id -> traces. OrderedDict gives us LRU order cheaply. | |
| _CACHE: OrderedDict[str, list[CachedTrace]] = OrderedDict() | |
| _META: dict[str, dict] = {} # dataset id -> {adapter, n_traces, truncated, n_models} | |
| # Precomputed spine store, loaded once and shared (design decision 5). None until | |
| # the first load attempt; an empty dict means "tried, nothing usable" so every | |
| # dataset falls through to live ingest. | |
| _STORE: dict[str, list[CachedTrace]] | None = None | |
| # Cached /compare payloads keyed by (axis, dataset, left, right) (design decision 7). | |
| _COMPARE_CACHE: OrderedDict[tuple[str, str, str, str], dict] = OrderedDict() | |
| def _load_store() -> dict[str, list[CachedTrace]]: | |
| """Download and parse the precomputed spine store, grouped by dataset. | |
| Reconstructs atoms from the space-joined spine (lossless: atoms carry no | |
| internal spaces). Any failure (no repo, offline, bad file) yields an empty | |
| store so callers transparently fall back to live ingest. | |
| """ | |
| global _STORE | |
| if _STORE is not None: | |
| return _STORE | |
| try: | |
| import pandas as pd | |
| from huggingface_hub import hf_hub_download | |
| path = hf_hub_download(SPINE_REPO, SPINE_FILE, repo_type="dataset") | |
| df = pd.read_parquet(path) | |
| store: dict[str, list[CachedTrace]] = {} | |
| for row in df.itertuples(index=False): | |
| spine = str(row.spine) | |
| atoms = tuple(spine.split()) | |
| # outcome + cot_tokens are optional: older stores lack them, read defensively. | |
| outcome = str(getattr(row, "outcome", "") or "") | |
| cot = int(getattr(row, "cot_tokens", 0) or 0) | |
| store.setdefault(str(row.dataset), []).append( | |
| CachedTrace( | |
| str(row.trace_id), str(row.agent), atoms, spine, str(row.task), outcome, cot | |
| ) | |
| ) | |
| _STORE = store | |
| except Exception: | |
| _STORE = {} | |
| return _STORE | |
| def _load(dataset: str) -> list[CachedTrace]: | |
| """Return canonicalized traces for ``dataset``. | |
| Prefers the precomputed spine store (design decision 5); on a store miss, | |
| ingests live, bounded by MAX_TRACES and a timeout and cached under an LRU of | |
| size MAX_DATASETS (design decisions 2 and 3). | |
| """ | |
| if dataset in _CACHE: | |
| _CACHE.move_to_end(dataset) | |
| return _CACHE[dataset] | |
| store = _load_store() | |
| if dataset in store: | |
| cached = store[dataset] | |
| _CACHE[dataset] = cached | |
| _META[dataset] = { | |
| "adapter": "spine-store", | |
| "n_traces": len(cached), | |
| "truncated": False, | |
| **_stats(cached), | |
| } | |
| while len(_CACHE) > MAX_DATASETS: | |
| evicted, _ = _CACHE.popitem(last=False) | |
| _META.pop(evicted, None) | |
| return cached | |
| traces, plan = ingest(dataset, limit=MAX_TRACES, timeout=INGEST_TIMEOUT_S) | |
| cached = [ | |
| CachedTrace( | |
| t.trace_id, t.agent, tuple(t.atoms), " ".join(t.atoms) + " ", str(t.group or "") | |
| ) | |
| for t in traces | |
| ] | |
| _CACHE[dataset] = cached | |
| _META[dataset] = { | |
| "adapter": plan.adapter, | |
| "n_traces": len(cached), | |
| "truncated": len(cached) >= MAX_TRACES, | |
| **_stats(cached), | |
| } | |
| while len(_CACHE) > MAX_DATASETS: | |
| evicted, _ = _CACHE.popitem(last=False) | |
| _META.pop(evicted, None) | |
| return cached | |
| def _stats(traces: list[CachedTrace]) -> dict[str, float | int]: | |
| """Quick dataset stats: behavioral diversity, conciseness, CoT length, dups. | |
| diversity_bits = mean per-trajectory action entropy (how varied each agent's | |
| actions are); median_len = median trace length (conciseness); median_cot = | |
| median count of `think` steps per trace (chain-of-thought length). | |
| """ | |
| if not traces: | |
| return {"n_models": 0} | |
| lens = [len(t.atoms) for t in traces] | |
| thinks = [sum(a == "think" for a in t.atoms) for t in traces] | |
| ents: list[float] = [] | |
| for t in traces: | |
| counts = Counter(t.atoms) | |
| n = len(t.atoms) or 1 | |
| ents.append(-sum((k / n) * math.log2(k / n) for k in counts.values())) | |
| uniq = len({t.spine for t in traces}) | |
| return { | |
| "diversity_bits": round(sum(ents) / len(ents), 2), | |
| "median_len": int(median(lens)), | |
| "median_cot": int(median(thinks)), | |
| "median_cot_tokens": int(median([t.cot_tokens for t in traces])), | |
| "exact_dup_rate": round(1 - uniq / len(traces), 3), | |
| "n_models": len({t.agent for t in traces}), | |
| } | |
| def _action_mix(traces: list[CachedTrace]) -> dict[str, float]: | |
| """Normalized frequency of each atom across the given traces.""" | |
| counts: Counter[str] = Counter(a for t in traces for a in t.atoms) | |
| total = sum(counts.values()) or 1 | |
| return {a: round(n / total, 4) for a, n in counts.most_common()} | |
| # API. | |
| app = FastAPI(title="ProcGrep explorer", docs_url="/api") | |
| # Allow the published Pages explorer to call the live query/compare API, so the | |
| # static site can run example queries over whole datasets (not just its embedded | |
| # samples). Read-only JSON endpoints; no credentials. | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=[ | |
| "https://hamidah.me", | |
| "http://localhost:8000", | |
| "http://127.0.0.1:8000", | |
| ], | |
| allow_methods=["GET", "POST"], | |
| allow_headers=["*"], | |
| ) | |
| class QueryRequest(BaseModel): | |
| dataset: str = SUGGESTED[0] | |
| pattern: str # regex over the space-joined atom spine | |
| offset: int = 0 # paginate the matched-trace sample for "show more" | |
| def datasets() -> JSONResponse: | |
| """Suggested datasets plus which ones are already warm in the cache. | |
| Store-backed datasets are surfaced first (they answer instantly), followed | |
| by the curated suggestions, deduped in that order. | |
| """ | |
| store = _load_store() | |
| store_ids = list(store.keys()) | |
| suggested = list(dict.fromkeys([*store_ids, *SUGGESTED])) | |
| # datasets that carry a genuine resolution label, so the outcome axis applies. | |
| outcome_datasets = [ | |
| d for d, ts in store.items() if any(t.outcome in ("resolved", "unresolved") for t in ts) | |
| ] | |
| return JSONResponse( | |
| { | |
| "suggested": suggested, | |
| "cached": list(_CACHE.keys()), | |
| "meta": _META, | |
| "outcome_datasets": outcome_datasets, | |
| } | |
| ) | |
| def query(req: QueryRequest) -> JSONResponse: | |
| """Run a structural regex over a whole dataset's atom spines. | |
| Returns the match count, per-model match rates, the matched-vs-all action | |
| mix, and a sample of matched traces. Errors (bad pattern, ingest failure) | |
| come back as a JSON ``error`` rather than a 500 so the client can show them. | |
| """ | |
| try: | |
| rx = re.compile(req.pattern) | |
| except re.error as exc: | |
| return JSONResponse({"error": f"invalid pattern: {exc}"}, status_code=200) | |
| try: | |
| traces = _load(req.dataset) | |
| except Exception as exc: | |
| return JSONResponse({"error": f"{type(exc).__name__}: {exc}"}, status_code=200) | |
| t0 = time.perf_counter() | |
| hits = [t for t in traces if rx.search(t.spine)] | |
| elapsed_ms = (time.perf_counter() - t0) * 1e3 | |
| by_model: dict[str, dict[str, int]] = {} | |
| for t in traces: | |
| by_model.setdefault(t.agent, {"n": 0, "hits": 0})["n"] += 1 | |
| for t in hits: | |
| by_model[t.agent]["hits"] += 1 | |
| models = sorted( | |
| ( | |
| {"model": m, "rate": round(c["hits"] / c["n"], 4), "n": c["n"]} | |
| for m, c in by_model.items() | |
| ), | |
| key=lambda r: -r["rate"], | |
| ) | |
| return JSONResponse( | |
| { | |
| "dataset": req.dataset, | |
| "pattern": req.pattern, | |
| "n_traces": len(traces), | |
| "n_hits": len(hits), | |
| "elapsed_ms": round(elapsed_ms, 2), | |
| "truncated": _META.get(req.dataset, {}).get("truncated", False), | |
| "stats": { | |
| k: _META.get(req.dataset, {}).get(k) | |
| for k in ( | |
| "diversity_bits", | |
| "median_len", | |
| "median_cot", | |
| "median_cot_tokens", | |
| "exact_dup_rate", | |
| "n_models", | |
| ) | |
| }, | |
| "by_model": models, | |
| "mix_all": _action_mix(traces), | |
| "mix_hits": _action_mix(hits) if hits else {}, | |
| "atom_color": ATOM_COLOR, | |
| "n_shown_from": req.offset, | |
| "hits": [ | |
| { | |
| "trace_id": t.trace_id, | |
| "model": t.agent, | |
| "task": t.task, | |
| "outcome": t.outcome, | |
| "cot_tokens": t.cot_tokens, | |
| "steps": len(t.atoms), | |
| "atoms": list(t.atoms[:200]), | |
| } | |
| for t in hits[req.offset : req.offset + HIT_SAMPLE] | |
| ], | |
| } | |
| ) | |
| class CompareRequest(BaseModel): | |
| """A side-by-side comparison along one axis. | |
| axis="agent": ``dataset`` is one store dataset; ``left``/``right`` are agent | |
| names within it. axis="eval": ``left``/``right`` are dataset ids (``dataset`` | |
| is ignored). axis="outcome" is reserved for the outcome-column build. | |
| """ | |
| axis: str = "agent" | |
| dataset: str = SUGGESTED[0] | |
| left: str | |
| right: str | |
| def _side_traces(axis: str, dataset: str, value: str) -> tuple[list[CachedTrace], str]: | |
| """Resolve one side of a comparison to its traces and a display label.""" | |
| if axis == "eval": | |
| # org/short label so same-basename datasets stay distinct, e.g. | |
| # nebius/SWE-agent vs ElenaFu/SWE-agent. | |
| org, _, name = value.partition("/") | |
| short = name.replace("-trajectories", "") or name | |
| return _load(value), f"{org}/{short}" if org else name | |
| if axis == "agent": | |
| return [t for t in _load(dataset) if t.agent == value], value | |
| if axis == "outcome": | |
| # value is "resolved" or "unresolved"; only datasets carrying a genuine | |
| # resolution label have these, so other datasets yield empty sides. | |
| return [t for t in _load(dataset) if t.outcome == value], value | |
| raise ValueError(f"unsupported axis: {axis}") | |
| def _mix_vector( | |
| mix_a: dict[str, float], mix_b: dict[str, float] | |
| ) -> tuple[list[float], list[float]]: | |
| """Align two action-mix dicts onto a shared atom alphabet for JSD.""" | |
| keys = sorted(set(mix_a) | set(mix_b)) | |
| return [mix_a.get(k, 0.0) for k in keys], [mix_b.get(k, 0.0) for k in keys] | |
| def _side_summary(traces: list[CachedTrace], label: str) -> dict: | |
| """Trail stack plus the same quick stats the query view reports. | |
| The stack is an even spread across the length distribution (short to long), | |
| not the longest N, so it reads as a fingerprint of the whole group rather | |
| than its tail. | |
| """ | |
| ordered = sorted((t for t in traces if t.atoms), key=lambda t: len(t.atoms)) | |
| if len(ordered) > CMP_STACK: | |
| stride = len(ordered) / CMP_STACK | |
| picked = [ordered[int(i * stride)] for i in range(CMP_STACK)] | |
| else: | |
| picked = ordered | |
| return { | |
| "label": label, | |
| "n": len(traces), | |
| "stats": _stats(traces), | |
| "mix": _action_mix(traces), | |
| "trails": [ | |
| { | |
| "trace_id": t.trace_id, | |
| "model": t.agent, | |
| "task": t.task, | |
| "outcome": t.outcome, | |
| "cot_tokens": t.cot_tokens, | |
| "steps": len(t.atoms), # true length; atoms below are capped for rendering | |
| "atoms": list(t.atoms[:CMP_TRAIL_CAP]), | |
| } | |
| for t in picked | |
| ], | |
| } | |
| def _compute_compare(axis: str, dataset: str, left_value: str, right_value: str) -> dict: | |
| """Build the side-by-side diff payload, or an ``{"error": ...}`` dict. | |
| Trail stacks plus a diff strip: the top procedures distinguishing the two | |
| sides (signed log-odds, positive favors the left), the action-mix JSD, and | |
| median length/CoT deltas. Reuses the library's BPE + discriminative_procedures | |
| (design decision 6). | |
| """ | |
| try: | |
| left, left_label = _side_traces(axis, dataset, left_value) | |
| right, right_label = _side_traces(axis, dataset, right_value) | |
| except Exception as exc: | |
| return {"error": f"{type(exc).__name__}: {exc}"} | |
| if left_label == right_label: | |
| return {"error": "pick two different groups"} | |
| if not left or not right: | |
| return {"error": f"no traces for {(left_label if not left else right_label)!r}"} | |
| t0 = time.perf_counter() | |
| # Build labeled Traces (group = side) for a sampled BPE + discriminative pass. | |
| sample = [ | |
| *(Trace(t.trace_id, t.agent, t.atoms, group=left_label) for t in left[:CMP_SAMPLE]), | |
| *(Trace(t.trace_id, t.agent, t.atoms, group=right_label) for t in right[:CMP_SAMPLE]), | |
| ] | |
| procedures: list[dict] = [] | |
| try: | |
| vocab = fit_bpe((t.atoms for t in sample), vocab_size=CMP_VOCAB) | |
| fingerprints = encode(sample, vocab=vocab) | |
| for d in discriminative_procedures( | |
| fingerprints, vocab, group_a=left_label, group_b=right_label, k=10 | |
| ): | |
| procedures.append( | |
| { | |
| "atoms": d.procedure.split(PROCEDURE_SEPARATOR), | |
| "log_odds": round(d.log_odds, 3), | |
| "p_left": round(d.p_a, 4), | |
| "p_right": round(d.p_b, 4), | |
| } | |
| ) | |
| except Exception: # diff is best-effort; the stacks still render without it | |
| procedures = [] | |
| lsum, rsum = _side_summary(left, left_label), _side_summary(right, right_label) | |
| pl, pr = _mix_vector(lsum["mix"], rsum["mix"]) | |
| elapsed_ms = (time.perf_counter() - t0) * 1e3 | |
| return { | |
| "axis": axis, | |
| "left": lsum, | |
| "right": rsum, | |
| "diff": { | |
| "procedures": procedures, | |
| "jsd": round(jsd(pl, pr), 4) if pl else None, | |
| "len_delta": lsum["stats"].get("median_len", 0) - rsum["stats"].get("median_len", 0), | |
| "cot_delta": lsum["stats"].get("median_cot", 0) - rsum["stats"].get("median_cot", 0), | |
| }, | |
| "atom_color": ATOM_COLOR, | |
| "elapsed_ms": round(elapsed_ms, 2), | |
| } | |
| def compare(req: CompareRequest) -> JSONResponse: | |
| """Cached side-by-side diff. Cache key is (axis, dataset, left, right). | |
| The default landing pair is pre-warmed at startup (design decision 7) so the | |
| comparator answers instantly rather than paying the first BPE pass live. | |
| """ | |
| key = (req.axis, req.dataset, req.left, req.right) | |
| cached = _COMPARE_CACHE.get(key) | |
| if cached is not None: | |
| _COMPARE_CACHE.move_to_end(key) | |
| return JSONResponse({**cached, "cached": True}) | |
| payload = _compute_compare(req.axis, req.dataset, req.left, req.right) | |
| if "error" not in payload: | |
| _COMPARE_CACHE[key] = payload | |
| while len(_COMPARE_CACHE) > COMPARE_CACHE_MAX: | |
| _COMPARE_CACHE.popitem(last=False) | |
| return JSONResponse(payload) | |
| def _warm_default_compare() -> None: | |
| """Cache the comparator's default landing pair for every axis (agent, eval, | |
| outcome), mirroring the frontend defaults so each axis lands instantly. | |
| Runs in a background thread so it never blocks startup. | |
| """ | |
| try: | |
| store = _load_store() | |
| datasets = list(store.keys()) or list(SUGGESTED) | |
| except Exception: | |
| return | |
| def warm(axis: str, dataset: str, left: str, right: str) -> None: | |
| key = (axis, dataset, left, right) | |
| if key not in _COMPARE_CACHE: | |
| with contextlib.suppress(Exception): | |
| _COMPARE_CACHE[key] = _compute_compare(axis, dataset, left, right) | |
| # agent: first dataset, its two most common agents (frontend default). | |
| try: | |
| agents = [a for a, _ in Counter(t.agent for t in _load(datasets[0])).most_common()] | |
| if len(agents) >= 2: | |
| warm("agent", datasets[0], agents[0], agents[1]) | |
| except Exception: | |
| pass | |
| # eval: first two store datasets. | |
| if len(datasets) >= 2: | |
| warm("eval", datasets[0], datasets[0], datasets[1]) | |
| # outcome: first dataset that carries a resolved label, resolved vs unresolved. | |
| outcome_ds = [ | |
| d for d, ts in store.items() if any(t.outcome in ("resolved", "unresolved") for t in ts) | |
| ] | |
| if outcome_ds: | |
| warm("outcome", outcome_ds[0], "resolved", "unresolved") | |
| def _warm_on_startup() -> None: | |
| threading.Thread(target=_warm_default_compare, daemon=True).start() | |
| def groups(dataset: str = SUGGESTED[0]) -> JSONResponse: | |
| """Agents present in a dataset (for the comparator's agent picker).""" | |
| try: | |
| traces = _load(dataset) | |
| except Exception as exc: | |
| return JSONResponse({"error": f"{type(exc).__name__}: {exc}"}, status_code=200) | |
| counts = Counter(t.agent for t in traces) | |
| agents = [{"agent": a, "n": n} for a, n in counts.most_common()] | |
| return JSONResponse({"dataset": dataset, "agents": agents}) | |
| def index() -> FileResponse: | |
| return FileResponse(STATIC / "index.html") | |
| app.mount("/static", StaticFiles(directory=STATIC), name="static") | |