midah's picture
Consolidated explorer shell on the Space: sidebar nav (eval/sessions toggle), bundled demo session, footer cross-nav to essay/arXiv/github; relative same-origin fetch; +2 datasets in SUGGESTED
c868574 verified
Raw
History Blame Contribute Delete
22.9 kB
"""ProcGrep explorer backend — query full agent-trajectory datasets, live.
Intent: a small FastAPI service that ingests a Hugging Face trajectory dataset
through procgrep, caches the canonicalized traces, and answers structural
queries (for example "an edit streak of five or more with no test") over the
whole dataset rather than a fixed 500-trace sample. Read this when changing the
explorer's server behavior or its caching.
Design decisions (benefit / price):
1. Import procgrep; do not reimplement canonicalization or queries.
Benefit: improvements to the library flow here for free, and the static
essay and this live backend share one definition of a "procedure".
Price: the Space pins a procgrep revision and must be rebuilt to pick up
library changes.
2. Cache canonicalized traces per dataset in memory under a small LRU.
Benefit: the expensive step (ingest plus canonicalize) runs once, after
which a query is a microsecond string scan.
Price: the first query on a cold dataset pays the full ingest cost, and
memory grows with the number of cached datasets (bounded by MAX_DATASETS).
3. Bound each ingest to MAX_TRACES with a timeout.
Benefit: predictable latency and memory on the free CPU tier.
Price: a very large dataset is sampled to the cap, not scanned in full;
the response reports when this happens.
4. A query is a regular expression over the canonical atom spine, matching the
static essay's query box exactly.
Benefit: one query language across the paper and the live demo.
Price: the spine drops argument-level detail, by design in procgrep.
5. Prefer a precomputed spine store (HF dataset midah/procgrep-spines) over
live ingest, falling back to live ingest when the store is missing a dataset
or cannot be reached.
Benefit: warm datasets answer instantly with no per-query streaming or
canonicalization, and coverage can grow on CI rather than at request time.
Price: store-backed datasets are only as fresh as the last refresh build;
the weekly action keeps them current.
6. The comparator (/compare) reuses the library's lineage primitives
(fit_bpe, encode, discriminative_procedures, jsd) rather than reinventing
them, and bounds each side to CMP_SAMPLE traces for the BPE/diff pass.
Benefit: the side-by-side diff means exactly what the paper's lineage_diff
means, and the heavy step stays bounded for interactive latency.
Price: the discriminative-procedure ranking sees a sample, not all traces,
on very large groups; the rendered trail stack is a further CMP_STACK cap.
7. Cache /compare results by their key, and pre-warm the default landing pair
in a background thread at startup.
Benefit: the comparator's landing view answers instantly instead of paying
the first BPE pass live; repeat toggles are free.
Price: a bounded amount of memory for the cache, and the warm-up does one
BPE pass at startup off the request path.
"""
from __future__ import annotations
import contextlib
import math
import re
import threading
import time
from collections import Counter, OrderedDict
from dataclasses import dataclass
from pathlib import Path
from statistics import median
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import FileResponse, JSONResponse
from fastapi.staticfiles import StaticFiles
from pydantic import BaseModel
from procgrep import (
Trace,
discriminative_procedures,
encode,
fit_bpe,
jsd,
)
from procgrep.ingest import ingest
from procgrep.types import PROCEDURE_SEPARATOR
# Configuration.
MAX_TRACES = 20000 # per-dataset ingest cap (design decision 3); raise as memory allows
MAX_DATASETS = 6 # cached datasets before LRU eviction (design decision 2)
INGEST_TIMEOUT_S = 60.0
HIT_SAMPLE = 50 # matched traces returned to the client
STATIC = Path(__file__).parent / "static"
SPINE_REPO = "midah/procgrep-spines" # precomputed store (design decision 5)
SPINE_FILE = "procgrep_spines.parquet"
CMP_SAMPLE = 1500 # traces per side fed to BPE + discriminative_procedures (design decision 6)
CMP_STACK = 30 # trails rendered per side in the comparator
CMP_VOCAB = 200 # BPE procedure-vocabulary size for the diff
CMP_TRAIL_CAP = 140 # atoms per rendered trail (fits the column at 3px/cell)
COMPARE_CACHE_MAX = 24 # cached /compare results before LRU eviction (design decision 7)
# A short, curated starting set; the client may query any dataset id.
SUGGESTED = (
"nebius/SWE-agent-trajectories",
"ElenaFu/SWE-agent-trajectories",
"nebius/SWE-rebench-openhands-trajectories",
"SWE-bench/SWE-smith-trajectories",
"nvidia/Nemotron-SFT-Agentic-v2",
"AlienKevin/SWE-ZERO-12M-trajectories",
)
# Canonical atom palette (kept in sync with the static explorer).
ATOM_COLOR = {
"search_repo": "#585E53",
"read_file": "#5692E5",
"edit": "#CB4D20",
"create_file": "#3D7AD8",
"run_test": "#20A380",
"submit": "#14110E",
"think": "#b7b1a7",
"localize": "#8C1040",
"delete_file": "#A03D18",
"error": "#B4184F",
"other": "#d9d4cc",
}
@dataclass(frozen=True)
class CachedTrace:
"""One canonicalized trajectory, with its atom spine pre-joined for scans."""
trace_id: str
agent: str
atoms: tuple[str, ...]
spine: str # " ".join(atoms) + " ", so `(edit ){5,}` style regexes match
task: str = "" # the task/instance the trajectory solved (trace.group), if known
outcome: str = "" # "resolved"/"unresolved"/"" from a genuine resolution label, when present
cot_tokens: int = 0 # approx tokens the model generated (reasoning volume), chars/4
# dataset id -> traces. OrderedDict gives us LRU order cheaply.
_CACHE: OrderedDict[str, list[CachedTrace]] = OrderedDict()
_META: dict[str, dict] = {} # dataset id -> {adapter, n_traces, truncated, n_models}
# Precomputed spine store, loaded once and shared (design decision 5). None until
# the first load attempt; an empty dict means "tried, nothing usable" so every
# dataset falls through to live ingest.
_STORE: dict[str, list[CachedTrace]] | None = None
# Cached /compare payloads keyed by (axis, dataset, left, right) (design decision 7).
_COMPARE_CACHE: OrderedDict[tuple[str, str, str, str], dict] = OrderedDict()
def _load_store() -> dict[str, list[CachedTrace]]:
"""Download and parse the precomputed spine store, grouped by dataset.
Reconstructs atoms from the space-joined spine (lossless: atoms carry no
internal spaces). Any failure (no repo, offline, bad file) yields an empty
store so callers transparently fall back to live ingest.
"""
global _STORE
if _STORE is not None:
return _STORE
try:
import pandas as pd
from huggingface_hub import hf_hub_download
path = hf_hub_download(SPINE_REPO, SPINE_FILE, repo_type="dataset")
df = pd.read_parquet(path)
store: dict[str, list[CachedTrace]] = {}
for row in df.itertuples(index=False):
spine = str(row.spine)
atoms = tuple(spine.split())
# outcome + cot_tokens are optional: older stores lack them, read defensively.
outcome = str(getattr(row, "outcome", "") or "")
cot = int(getattr(row, "cot_tokens", 0) or 0)
store.setdefault(str(row.dataset), []).append(
CachedTrace(
str(row.trace_id), str(row.agent), atoms, spine, str(row.task), outcome, cot
)
)
_STORE = store
except Exception:
_STORE = {}
return _STORE
def _load(dataset: str) -> list[CachedTrace]:
"""Return canonicalized traces for ``dataset``.
Prefers the precomputed spine store (design decision 5); on a store miss,
ingests live, bounded by MAX_TRACES and a timeout and cached under an LRU of
size MAX_DATASETS (design decisions 2 and 3).
"""
if dataset in _CACHE:
_CACHE.move_to_end(dataset)
return _CACHE[dataset]
store = _load_store()
if dataset in store:
cached = store[dataset]
_CACHE[dataset] = cached
_META[dataset] = {
"adapter": "spine-store",
"n_traces": len(cached),
"truncated": False,
**_stats(cached),
}
while len(_CACHE) > MAX_DATASETS:
evicted, _ = _CACHE.popitem(last=False)
_META.pop(evicted, None)
return cached
traces, plan = ingest(dataset, limit=MAX_TRACES, timeout=INGEST_TIMEOUT_S)
cached = [
CachedTrace(
t.trace_id, t.agent, tuple(t.atoms), " ".join(t.atoms) + " ", str(t.group or "")
)
for t in traces
]
_CACHE[dataset] = cached
_META[dataset] = {
"adapter": plan.adapter,
"n_traces": len(cached),
"truncated": len(cached) >= MAX_TRACES,
**_stats(cached),
}
while len(_CACHE) > MAX_DATASETS:
evicted, _ = _CACHE.popitem(last=False)
_META.pop(evicted, None)
return cached
def _stats(traces: list[CachedTrace]) -> dict[str, float | int]:
"""Quick dataset stats: behavioral diversity, conciseness, CoT length, dups.
diversity_bits = mean per-trajectory action entropy (how varied each agent's
actions are); median_len = median trace length (conciseness); median_cot =
median count of `think` steps per trace (chain-of-thought length).
"""
if not traces:
return {"n_models": 0}
lens = [len(t.atoms) for t in traces]
thinks = [sum(a == "think" for a in t.atoms) for t in traces]
ents: list[float] = []
for t in traces:
counts = Counter(t.atoms)
n = len(t.atoms) or 1
ents.append(-sum((k / n) * math.log2(k / n) for k in counts.values()))
uniq = len({t.spine for t in traces})
return {
"diversity_bits": round(sum(ents) / len(ents), 2),
"median_len": int(median(lens)),
"median_cot": int(median(thinks)),
"median_cot_tokens": int(median([t.cot_tokens for t in traces])),
"exact_dup_rate": round(1 - uniq / len(traces), 3),
"n_models": len({t.agent for t in traces}),
}
def _action_mix(traces: list[CachedTrace]) -> dict[str, float]:
"""Normalized frequency of each atom across the given traces."""
counts: Counter[str] = Counter(a for t in traces for a in t.atoms)
total = sum(counts.values()) or 1
return {a: round(n / total, 4) for a, n in counts.most_common()}
# API.
app = FastAPI(title="ProcGrep explorer", docs_url="/api")
# Allow the published Pages explorer to call the live query/compare API, so the
# static site can run example queries over whole datasets (not just its embedded
# samples). Read-only JSON endpoints; no credentials.
app.add_middleware(
CORSMiddleware,
allow_origins=[
"https://hamidah.me",
"http://localhost:8000",
"http://127.0.0.1:8000",
],
allow_methods=["GET", "POST"],
allow_headers=["*"],
)
class QueryRequest(BaseModel):
dataset: str = SUGGESTED[0]
pattern: str # regex over the space-joined atom spine
offset: int = 0 # paginate the matched-trace sample for "show more"
@app.get("/datasets")
def datasets() -> JSONResponse:
"""Suggested datasets plus which ones are already warm in the cache.
Store-backed datasets are surfaced first (they answer instantly), followed
by the curated suggestions, deduped in that order.
"""
store = _load_store()
store_ids = list(store.keys())
suggested = list(dict.fromkeys([*store_ids, *SUGGESTED]))
# datasets that carry a genuine resolution label, so the outcome axis applies.
outcome_datasets = [
d for d, ts in store.items() if any(t.outcome in ("resolved", "unresolved") for t in ts)
]
return JSONResponse(
{
"suggested": suggested,
"cached": list(_CACHE.keys()),
"meta": _META,
"outcome_datasets": outcome_datasets,
}
)
@app.post("/query")
def query(req: QueryRequest) -> JSONResponse:
"""Run a structural regex over a whole dataset's atom spines.
Returns the match count, per-model match rates, the matched-vs-all action
mix, and a sample of matched traces. Errors (bad pattern, ingest failure)
come back as a JSON ``error`` rather than a 500 so the client can show them.
"""
try:
rx = re.compile(req.pattern)
except re.error as exc:
return JSONResponse({"error": f"invalid pattern: {exc}"}, status_code=200)
try:
traces = _load(req.dataset)
except Exception as exc:
return JSONResponse({"error": f"{type(exc).__name__}: {exc}"}, status_code=200)
t0 = time.perf_counter()
hits = [t for t in traces if rx.search(t.spine)]
elapsed_ms = (time.perf_counter() - t0) * 1e3
by_model: dict[str, dict[str, int]] = {}
for t in traces:
by_model.setdefault(t.agent, {"n": 0, "hits": 0})["n"] += 1
for t in hits:
by_model[t.agent]["hits"] += 1
models = sorted(
(
{"model": m, "rate": round(c["hits"] / c["n"], 4), "n": c["n"]}
for m, c in by_model.items()
),
key=lambda r: -r["rate"],
)
return JSONResponse(
{
"dataset": req.dataset,
"pattern": req.pattern,
"n_traces": len(traces),
"n_hits": len(hits),
"elapsed_ms": round(elapsed_ms, 2),
"truncated": _META.get(req.dataset, {}).get("truncated", False),
"stats": {
k: _META.get(req.dataset, {}).get(k)
for k in (
"diversity_bits",
"median_len",
"median_cot",
"median_cot_tokens",
"exact_dup_rate",
"n_models",
)
},
"by_model": models,
"mix_all": _action_mix(traces),
"mix_hits": _action_mix(hits) if hits else {},
"atom_color": ATOM_COLOR,
"n_shown_from": req.offset,
"hits": [
{
"trace_id": t.trace_id,
"model": t.agent,
"task": t.task,
"outcome": t.outcome,
"cot_tokens": t.cot_tokens,
"steps": len(t.atoms),
"atoms": list(t.atoms[:200]),
}
for t in hits[req.offset : req.offset + HIT_SAMPLE]
],
}
)
class CompareRequest(BaseModel):
"""A side-by-side comparison along one axis.
axis="agent": ``dataset`` is one store dataset; ``left``/``right`` are agent
names within it. axis="eval": ``left``/``right`` are dataset ids (``dataset``
is ignored). axis="outcome" is reserved for the outcome-column build.
"""
axis: str = "agent"
dataset: str = SUGGESTED[0]
left: str
right: str
def _side_traces(axis: str, dataset: str, value: str) -> tuple[list[CachedTrace], str]:
"""Resolve one side of a comparison to its traces and a display label."""
if axis == "eval":
# org/short label so same-basename datasets stay distinct, e.g.
# nebius/SWE-agent vs ElenaFu/SWE-agent.
org, _, name = value.partition("/")
short = name.replace("-trajectories", "") or name
return _load(value), f"{org}/{short}" if org else name
if axis == "agent":
return [t for t in _load(dataset) if t.agent == value], value
if axis == "outcome":
# value is "resolved" or "unresolved"; only datasets carrying a genuine
# resolution label have these, so other datasets yield empty sides.
return [t for t in _load(dataset) if t.outcome == value], value
raise ValueError(f"unsupported axis: {axis}")
def _mix_vector(
mix_a: dict[str, float], mix_b: dict[str, float]
) -> tuple[list[float], list[float]]:
"""Align two action-mix dicts onto a shared atom alphabet for JSD."""
keys = sorted(set(mix_a) | set(mix_b))
return [mix_a.get(k, 0.0) for k in keys], [mix_b.get(k, 0.0) for k in keys]
def _side_summary(traces: list[CachedTrace], label: str) -> dict:
"""Trail stack plus the same quick stats the query view reports.
The stack is an even spread across the length distribution (short to long),
not the longest N, so it reads as a fingerprint of the whole group rather
than its tail.
"""
ordered = sorted((t for t in traces if t.atoms), key=lambda t: len(t.atoms))
if len(ordered) > CMP_STACK:
stride = len(ordered) / CMP_STACK
picked = [ordered[int(i * stride)] for i in range(CMP_STACK)]
else:
picked = ordered
return {
"label": label,
"n": len(traces),
"stats": _stats(traces),
"mix": _action_mix(traces),
"trails": [
{
"trace_id": t.trace_id,
"model": t.agent,
"task": t.task,
"outcome": t.outcome,
"cot_tokens": t.cot_tokens,
"steps": len(t.atoms), # true length; atoms below are capped for rendering
"atoms": list(t.atoms[:CMP_TRAIL_CAP]),
}
for t in picked
],
}
def _compute_compare(axis: str, dataset: str, left_value: str, right_value: str) -> dict:
"""Build the side-by-side diff payload, or an ``{"error": ...}`` dict.
Trail stacks plus a diff strip: the top procedures distinguishing the two
sides (signed log-odds, positive favors the left), the action-mix JSD, and
median length/CoT deltas. Reuses the library's BPE + discriminative_procedures
(design decision 6).
"""
try:
left, left_label = _side_traces(axis, dataset, left_value)
right, right_label = _side_traces(axis, dataset, right_value)
except Exception as exc:
return {"error": f"{type(exc).__name__}: {exc}"}
if left_label == right_label:
return {"error": "pick two different groups"}
if not left or not right:
return {"error": f"no traces for {(left_label if not left else right_label)!r}"}
t0 = time.perf_counter()
# Build labeled Traces (group = side) for a sampled BPE + discriminative pass.
sample = [
*(Trace(t.trace_id, t.agent, t.atoms, group=left_label) for t in left[:CMP_SAMPLE]),
*(Trace(t.trace_id, t.agent, t.atoms, group=right_label) for t in right[:CMP_SAMPLE]),
]
procedures: list[dict] = []
try:
vocab = fit_bpe((t.atoms for t in sample), vocab_size=CMP_VOCAB)
fingerprints = encode(sample, vocab=vocab)
for d in discriminative_procedures(
fingerprints, vocab, group_a=left_label, group_b=right_label, k=10
):
procedures.append(
{
"atoms": d.procedure.split(PROCEDURE_SEPARATOR),
"log_odds": round(d.log_odds, 3),
"p_left": round(d.p_a, 4),
"p_right": round(d.p_b, 4),
}
)
except Exception: # diff is best-effort; the stacks still render without it
procedures = []
lsum, rsum = _side_summary(left, left_label), _side_summary(right, right_label)
pl, pr = _mix_vector(lsum["mix"], rsum["mix"])
elapsed_ms = (time.perf_counter() - t0) * 1e3
return {
"axis": axis,
"left": lsum,
"right": rsum,
"diff": {
"procedures": procedures,
"jsd": round(jsd(pl, pr), 4) if pl else None,
"len_delta": lsum["stats"].get("median_len", 0) - rsum["stats"].get("median_len", 0),
"cot_delta": lsum["stats"].get("median_cot", 0) - rsum["stats"].get("median_cot", 0),
},
"atom_color": ATOM_COLOR,
"elapsed_ms": round(elapsed_ms, 2),
}
@app.post("/compare")
def compare(req: CompareRequest) -> JSONResponse:
"""Cached side-by-side diff. Cache key is (axis, dataset, left, right).
The default landing pair is pre-warmed at startup (design decision 7) so the
comparator answers instantly rather than paying the first BPE pass live.
"""
key = (req.axis, req.dataset, req.left, req.right)
cached = _COMPARE_CACHE.get(key)
if cached is not None:
_COMPARE_CACHE.move_to_end(key)
return JSONResponse({**cached, "cached": True})
payload = _compute_compare(req.axis, req.dataset, req.left, req.right)
if "error" not in payload:
_COMPARE_CACHE[key] = payload
while len(_COMPARE_CACHE) > COMPARE_CACHE_MAX:
_COMPARE_CACHE.popitem(last=False)
return JSONResponse(payload)
def _warm_default_compare() -> None:
"""Cache the comparator's default landing pair for every axis (agent, eval,
outcome), mirroring the frontend defaults so each axis lands instantly.
Runs in a background thread so it never blocks startup.
"""
try:
store = _load_store()
datasets = list(store.keys()) or list(SUGGESTED)
except Exception:
return
def warm(axis: str, dataset: str, left: str, right: str) -> None:
key = (axis, dataset, left, right)
if key not in _COMPARE_CACHE:
with contextlib.suppress(Exception):
_COMPARE_CACHE[key] = _compute_compare(axis, dataset, left, right)
# agent: first dataset, its two most common agents (frontend default).
try:
agents = [a for a, _ in Counter(t.agent for t in _load(datasets[0])).most_common()]
if len(agents) >= 2:
warm("agent", datasets[0], agents[0], agents[1])
except Exception:
pass
# eval: first two store datasets.
if len(datasets) >= 2:
warm("eval", datasets[0], datasets[0], datasets[1])
# outcome: first dataset that carries a resolved label, resolved vs unresolved.
outcome_ds = [
d for d, ts in store.items() if any(t.outcome in ("resolved", "unresolved") for t in ts)
]
if outcome_ds:
warm("outcome", outcome_ds[0], "resolved", "unresolved")
@app.on_event("startup")
def _warm_on_startup() -> None:
threading.Thread(target=_warm_default_compare, daemon=True).start()
@app.get("/groups")
def groups(dataset: str = SUGGESTED[0]) -> JSONResponse:
"""Agents present in a dataset (for the comparator's agent picker)."""
try:
traces = _load(dataset)
except Exception as exc:
return JSONResponse({"error": f"{type(exc).__name__}: {exc}"}, status_code=200)
counts = Counter(t.agent for t in traces)
agents = [{"agent": a, "n": n} for a, n in counts.most_common()]
return JSONResponse({"dataset": dataset, "agents": agents})
@app.get("/")
def index() -> FileResponse:
return FileResponse(STATIC / "index.html")
app.mount("/static", StaticFiles(directory=STATIC), name="static")