Spaces:
Running on Zero
Running on Zero
| #!/usr/bin/env python3 | |
| """Her · हेर — local API server. 100% LOCAL, 127.0.0.1 ONLY. | |
| A thin HTTP transport over the deterministic engine. It does three jobs and no | |
| more (the engine stays the product; this just carries its output to the UI): | |
| GET /api/health -> {ok, llama} liveness + model reachable? | |
| GET /api/sessions -> projects[] of real sessions (discovery.py; cwd from inside files) | |
| GET /api/analyze?path=.. -> enriched engine JSON (cli/analyze, cached by mtime) | |
| POST /api/chat {question, path} grounded Q&A over ONE session's trace | |
| GET / (and assets) -> the built UI (ui/dist) single origin, no CORS | |
| Non-negotiables honoured: | |
| * NO model and NO network in the engine path; the ONLY model call is the chat, | |
| and it goes to the LOCAL llama-server via NarratorClient (localhost-guarded). | |
| * Trace content never leaves the machine: bind 127.0.0.1, llama is localhost, | |
| no outbound calls anywhere. | |
| * cwd is trusted from inside each file (discovery.py), never decoded from the | |
| lossy folder name. | |
| * Path safety: only .jsonl files under ~/.claude or this repo may be read. | |
| """ | |
| from __future__ import annotations | |
| import json | |
| import os | |
| import re | |
| import sys | |
| import urllib.parse | |
| from collections import Counter | |
| from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer | |
| from pathlib import Path | |
| REPO = Path(__file__).resolve().parent.parent | |
| if str(REPO) not in sys.path: | |
| sys.path.insert(0, str(REPO)) | |
| from engine.contract import to_jsonable # noqa: E402 | |
| from engine.core.analyze import analyze_path # noqa: E402 | |
| from engine.core.best_practices import practice_for # noqa: E402 | |
| from engine.core.binaries_db import load_registry # noqa: E402 | |
| from engine.core import impact # noqa: E402 | |
| from engine.loaders.jsonl_loader import load # noqa: E402 | |
| from engine.entities import extract_entities, entity_totals # noqa: E402 | |
| from engine.binaries import extract_binaries, unknown_binary_names # noqa: E402 | |
| from engine import discovery # noqa: E402 | |
| from narrator.client import NarratorClient # noqa: E402 | |
| from narrator.factory import get_narrator # noqa: E402 | |
| HOST = "127.0.0.1" | |
| PORT = int(os.environ.get("HER_PORT", os.environ.get("TRACE_PORT", "8765"))) | |
| DIST = REPO / "ui" / "dist" | |
| PUBLIC = REPO / "ui" / "public" | |
| # The ONE bundled demo session (identity-sanitized). It is NOT a default: it loads | |
| # only via the explicit "__demo__" sentinel below (the landing demo button), never as | |
| # an auto-fallback for a missing/empty path. | |
| DEMO = REPO / "fixtures" / "demo-session.jsonl" | |
| CLAUDE_DIR = (Path.home() / ".claude").resolve() | |
| # An extra allowed root for session files. The ZeroGPU Space mounts an HF storage | |
| # bucket at /data and sets HER_EXTRA_ROOT=/data; uploaded sessions live under it | |
| # (namespaced per client). The local product leaves this unset → behavior unchanged. | |
| _EXTRA_ROOT_ENV = os.environ.get("HER_EXTRA_ROOT") | |
| EXTRA_ROOT = Path(_EXTRA_ROOT_ENV).resolve() if _EXTRA_ROOT_ENV else None | |
| # --------------------------------------------------------------------------- # | |
| # analyze cache — keyed by (realpath, mtime) so editing/replacing a file busts it | |
| # --------------------------------------------------------------------------- # | |
| _CACHE: dict[tuple[str, int], dict] = {} | |
| # Passive enricher work-queue: bare binary names discovered during analysis that | |
| # the registry can't yet name. The background daemon (Phase B) drains this; until | |
| # then it just accumulates (deduped, bounded) and nothing blocks the response. | |
| _ENRICH_QUEUE: "set[str]" = set() | |
| def _enqueue_unknown_binaries(binaries: list) -> None: | |
| """Add not-yet-identified binary NAMES (bare data only — never command text) | |
| to the enricher queue. Fire-and-forget; safe if the enricher is disabled.""" | |
| if os.environ.get("HER_ENRICH") == "0": | |
| return | |
| for u in unknown_binary_names(binaries): | |
| if len(_ENRICH_QUEUE) < 500: | |
| _ENRICH_QUEUE.add(u["name"]) | |
| # --------------------------------------------------------------------------- # | |
| # consent — the first-run disclaimer's opt-in for sharing learnings (default on). | |
| # Persisted to ~/.her-consent.json so the daemon knows whether to upload and the | |
| # user is asked only once. The disclaimer + slider live in the UI (DisclaimerModal). | |
| # --------------------------------------------------------------------------- # | |
| CONSENT_PATH = Path.home() / ".her-consent.json" | |
| _CONSENT: dict = {"accepted": False, "share": True} # default share=on (per owner) | |
| def _load_consent() -> None: | |
| global _CONSENT | |
| try: | |
| data = json.loads(CONSENT_PATH.read_text(encoding="utf-8")) | |
| if isinstance(data, dict): | |
| _CONSENT = {"accepted": bool(data.get("accepted")), "share": bool(data.get("share", True))} | |
| except (OSError, ValueError): | |
| pass | |
| def _save_consent(accepted: bool, share: bool) -> None: | |
| global _CONSENT | |
| _CONSENT = {"accepted": bool(accepted), "share": bool(share)} | |
| try: | |
| CONSENT_PATH.write_text(json.dumps(_CONSENT), encoding="utf-8") | |
| except OSError: | |
| pass | |
| _load_consent() | |
| def _enricher_daemon() -> None: | |
| """PASSIVE background worker: drain the unknown-binary queue and enrich it via | |
| the local model + public package registries (bare names only — the one | |
| owner-approved egress, NN#2). Never blocks any request. When it learns | |
| something, it busts the analyze/brief caches so the new product name + logo | |
| appear on the next view; and, ONLY if the user opted in (consent.share), it | |
| shares the credential-scrubbed learnings file to the write-only R2 collector. | |
| Opt out of enrichment with HER_ENRICH=0; opt out of sharing in the disclaimer.""" | |
| import time | |
| try: | |
| from narrator.enricher import enrich_names, share_learnings | |
| except Exception: | |
| return # enricher not available -> stay silent, queue just accumulates | |
| while True: | |
| time.sleep(5) | |
| if not _ENRICH_QUEUE: | |
| continue | |
| batch = [] | |
| while _ENRICH_QUEUE and len(batch) < 8: | |
| batch.append(_ENRICH_QUEUE.pop()) | |
| try: | |
| learned = enrich_names(batch) | |
| except Exception: | |
| learned = 0 | |
| if learned: | |
| # the registry mtime-cache auto-refreshes; bust the result caches so a | |
| # now-known binary stops showing as bare on the next analyze/project. | |
| _CACHE.clear() | |
| _BRIEF_CACHE.clear() | |
| # share the (scrubbed) learnings to R2 ONLY if the OWNER explicitly | |
| # enabled it (HER_SHARE=1) AND consent allows. DISABLED BY DEFAULT IN CODE: | |
| # HER_SHARE defaults to "0" here (and the hosted Space also sets it to 0), so | |
| # NO learnings ever egress unless someone deliberately opts in — a file | |
| # reader sees the phone-home is off in the default config. share_learnings() | |
| # re-checks the same flag itself, so this is defence-in-depth, not the only | |
| # gate. | |
| if (os.environ.get("HER_SHARE", "0") == "1" | |
| and _CONSENT.get("accepted") and _CONSENT.get("share")): | |
| try: | |
| share_learnings() | |
| except Exception: | |
| pass | |
| def _start_enricher() -> None: | |
| """Start the passive enricher daemon thread unless disabled (HER_ENRICH=0).""" | |
| if os.environ.get("HER_ENRICH") == "0": | |
| return | |
| import threading | |
| threading.Thread(target=_enricher_daemon, daemon=True, name="her-enricher").start() | |
| def _serialize(result: dict) -> dict: | |
| return { | |
| "session": result["session"], | |
| "turns": [to_jsonable(t) for t in result["turns"]], | |
| "events": [to_jsonable(e) for e in result["events"]], | |
| "findings": result["findings"], | |
| "recommendations": result.get("recommendations", []), | |
| } | |
| def _safe_session_path(raw: str | None) -> Path | None: | |
| """Resolve a requested session path. Only .jsonl files under ~/.claude or the | |
| repo are allowed; everything else is refused. | |
| The literal sentinel "__demo__" resolves to the bundled demo session — this is the | |
| ONLY way it loads (the landing demo button sends it). An empty/None path is NOT a | |
| session and returns None: there is deliberately no silent demo/fixture default.""" | |
| if raw == "__demo__": | |
| return DEMO if DEMO.is_file() else None | |
| if not raw: | |
| return None | |
| try: | |
| p = Path(raw).expanduser().resolve() | |
| except (OSError, RuntimeError): | |
| return None | |
| if p.suffix != ".jsonl" or not p.is_file(): | |
| return None | |
| # Real ancestor containment (not a raw string prefix, which would accept a sibling | |
| # like <repo>-evil/x.jsonl). Allows ~/.claude and anything under the repo (incl. | |
| # the Space's REPO/.uploads). is_relative_to is Py3.9+; the repo targets 3.10+. | |
| roots = [CLAUDE_DIR, REPO.resolve()] + ([EXTRA_ROOT] if EXTRA_ROOT else []) | |
| try: | |
| ok = any(p.is_relative_to(r) for r in roots) | |
| except AttributeError: # pragma: no cover - Py<3.9 boundary-aware fallback | |
| ok = any((str(p) + os.sep).startswith(str(r) + os.sep) for r in roots) | |
| if not ok: | |
| return None | |
| return p | |
| def _analyze_cached(path: Path) -> dict: | |
| key = (str(path), path.stat().st_mtime_ns) | |
| if key not in _CACHE: | |
| _CACHE.clear() # one session at a time is plenty; keep memory flat | |
| payload = _serialize(analyze_path(str(path))) | |
| # named entities (skills / sub-agents / MCP) for per-session tracing | |
| payload["entities"] = extract_entities(payload["turns"]) | |
| # binaries run via Bash (npx remotion -> remotion, railway, …) — a separate | |
| # dimension from tool calls, enriched from the registry; unknowns queued for | |
| # the background enricher (passive — never blocks this response). | |
| payload["binaries"] = extract_binaries(payload["turns"], load_registry()) | |
| # actions worth reviewing + risk level + outcome (deterministic, suggest-only) | |
| payload["impact"] = impact.detect_impact(payload["turns"], payload["binaries"]) | |
| _enqueue_unknown_binaries(payload["binaries"]) | |
| _CACHE[key] = payload | |
| return _CACHE[key] | |
| # --------------------------------------------------------------------------- # | |
| # sessions inventory for the browser (discovery + light file stats) | |
| # --------------------------------------------------------------------------- # | |
| def _sessions_payload(projects_dir: str | None = None) -> dict: | |
| refs = discovery.discover_sessions(projects_dir) | |
| by_cwd: dict[str, list[dict]] = {} | |
| for r in refs: | |
| if not r.cwd: | |
| continue | |
| try: | |
| st = os.stat(r.path) | |
| mtime, size = int(st.st_mtime), st.st_size | |
| except OSError: | |
| mtime, size = 0, 0 | |
| by_cwd.setdefault(r.cwd, []).append({ | |
| "path": r.path, | |
| "sessionId": r.sessionId, | |
| "encodedDir": r.encodedDir, | |
| "mtime": mtime, | |
| "sizeBytes": size, | |
| # real session start time read from inside the file (Shripal: tell | |
| # sessions apart). getattr keeps this safe if discovery is older. | |
| "startedAt": getattr(r, "startedAt", None), | |
| }) | |
| projects = [] | |
| for cwd in sorted(by_cwd): | |
| sess = sorted(by_cwd[cwd], key=lambda s: s["mtime"], reverse=True) | |
| projects.append({"cwd": cwd, "count": len(sess), "sessions": sess}) | |
| projects.sort(key=lambda p: p["count"], reverse=True) | |
| total = sum(p["count"] for p in projects) | |
| return {"projects": projects, "total": total, "projectCount": len(projects)} | |
| # --------------------------------------------------------------------------- # | |
| # grounded chat — deterministic retrieval over ONE session, model writes prose | |
| # --------------------------------------------------------------------------- # | |
| _STOP = {"the", "and", "why", "did", "this", "that", "what", "how", "was", "were", | |
| "for", "with", "you", "are", "does", "doing", "happen", "happened", | |
| "show", "tell", "explain", "which", "where", "when", "who", "from", | |
| "into", "over", "about", "there", "here", "have", "has", "its"} | |
| def _words(text: str) -> list[str]: | |
| out, cur = [], [] | |
| for ch in (text or "").lower(): | |
| if ch.isalnum() or ch in "._/-": | |
| cur.append(ch) | |
| else: | |
| if cur: | |
| out.append("".join(cur)); cur = [] | |
| if cur: | |
| out.append("".join(cur)) | |
| return [w for w in out if len(w) >= 3 and w not in _STOP] | |
| def _turn_blob(t: dict) -> str: | |
| parts = [t.get("prompt", ""), t.get("reply", "")] | |
| for tc in t.get("tools", []): | |
| parts.append(tc.get("summary", "")) | |
| if tc.get("flowValue"): | |
| parts.append(str(tc["flowValue"])) | |
| if t.get("guide"): | |
| g = t["guide"] | |
| parts.append(f"{g.get('head','')} {g.get('body','')}") | |
| return " ".join(parts) | |
| def _best_practice_block(analysis: dict) -> str: | |
| """A compact, cited 'what could be better' block, built from the SAME | |
| deterministic `recommendations` the UI renders (engine output). Each line pairs | |
| the observed pattern with its cited Anthropic fix. Empty `recommendations` -> | |
| '' (silence is a valid result, build rule #6). The model may teach ONLY from | |
| what's here; it cannot invent a best practice.""" | |
| recs = analysis.get("recommendations", []) or [] | |
| if not recs: | |
| return "" | |
| lines = [ | |
| "WHAT COULD BE BETTER (deterministic signals + the cited Anthropic best " | |
| "practice each maps to; suggest-only, cite the turn):" | |
| ] | |
| source = None | |
| for r in recs: | |
| tstr = ", ".join(f"turn {i}" for i in r.get("turns", [])) | |
| practice = r.get("practice") | |
| head = r.get("headline", "") | |
| advice = r.get("advice", "") | |
| if practice: | |
| lines.append(f"- {tstr}: {head} -> best practice \"{practice}\": {advice}") | |
| source = r.get("source") or source | |
| else: | |
| lines.append(f"- {tstr}: {head} — {advice}") | |
| if source: | |
| lines.append(f"(Source: {source})") | |
| return "\n".join(lines) | |
| def _retrieve(analysis: dict, question: str) -> tuple[int, list[int], str]: | |
| """Deterministic: score every turn by keyword overlap with the question (plus | |
| explicit 'turn N' references and cost-intent boosts). Return | |
| (focus_turn_index, cited_turn_indices, context_text).""" | |
| turns = analysis["turns"] | |
| sess = analysis["session"] | |
| qwords = set(_words(question)) | |
| ql = (question or "").lower() | |
| # explicit "turn N" / "query N" references | |
| explicit: set[int] = set() | |
| toks = ql.replace("#", " ").split() | |
| for i, tok in enumerate(toks): | |
| if tok in ("turn", "query", "turns", "queries") and i + 1 < len(toks): | |
| num = "".join(c for c in toks[i + 1] if c.isdigit()) | |
| if num != "": | |
| explicit.add(int(num)) | |
| cost_intent = any(w in ql for w in ("expensive", "cost", "slow", "heavy", "token", | |
| "loop", "re-read", "reread", "churn", "spend")) | |
| err_intent = any(w in ql for w in ("error", "fail", "failed", "broke", "broken", "wrong", "stuck")) | |
| # window intent: questions about the live context window / fill / compaction — | |
| # answered from the deterministic gauge (session.context), NOT the cumulative sums. | |
| ctx_intent = any(w in ql for w in ("context window", "window", "compact", "fill", | |
| "full", "fit", "1m", "overflow", "ran out", "gauge")) | |
| scored = [] | |
| compact_turns = {c.get("atTurn") for c in (sess.get("context", {}) or {}).get("compactions", [])} | |
| for t in turns: | |
| blob = set(_words(_turn_blob(t))) | |
| score = len(qwords & blob) | |
| if t["i"] in explicit: | |
| score += 100 | |
| if cost_intent and t.get("heavy"): | |
| score += 3 | |
| if cost_intent and t.get("guide"): | |
| score += 2 | |
| if err_intent and any(tc.get("errored") for tc in t.get("tools", [])): | |
| score += 3 | |
| if ctx_intent and t["i"] in compact_turns: # window question → surface compactions | |
| score += 3 | |
| scored.append((score, -t["i"], t)) # tie-break: earlier turn first | |
| scored.sort(reverse=True) | |
| # focus = top turn (fall back to heaviest if the question matched nothing) | |
| if scored[0][0] <= 0: | |
| heavy = sess.get("heavyTurns") or [0] | |
| focus = max(heavy, key=lambda i: turns[i]["tokens"]["cacheRead"]) | |
| top = [focus] | |
| else: | |
| focus = scored[0][2]["i"] | |
| top = [s[2]["i"] for s in scored[:3] if s[0] > 0] | |
| if not top: | |
| top = [focus] | |
| # build a compact, faithful context from the chosen turns | |
| ctxw = sess.get("context", {}) or {} | |
| comps = ctxw.get("compactions", []) or [] | |
| over = ctxw.get("overLimit", []) or [] | |
| # CUMULATIVE token sums (no ceiling — re-paid every round-trip) vs the POINT-IN-TIME | |
| # window gauge (bounded by the model's window). Spell out both so the model never | |
| # conflates a multi-million cache-read total with the ≤1M context window. | |
| lines = [ | |
| f"SESSION: cwd={sess.get('cwd')} · {sess.get('turns')} turns " | |
| f"({sess.get('humanTurns')} human, {sess.get('systemTurns')} system) · " | |
| f"{sess.get('tools')} tool calls · cache re-reads {sess.get('tokens',{}).get('cacheRead'):,} " | |
| f"(CUMULATIVE across all round-trips, ~{round(sess.get('cacheReadOverOut',0))}x generated — NOT window size) · " | |
| f"agent-driven {round(100*sess.get('indirectRatio',0))}% " | |
| f"({sess.get('indirect')} indirect / {sess.get('direct')} direct) · " | |
| f"heavy turns {sess.get('heavyTurns')} · real retry loops 0.", | |
| f"CONTEXT WINDOW (point-in-time gauge, bounded by the model's window): " | |
| f"peak fill {ctxw.get('peak',0):,} / {ctxw.get('limit',1_000_000):,} " | |
| f"({round(100*ctxw.get('peakPct',0))}% of the window) · " | |
| f"compactions: {len(comps)}" | |
| + (f" (at turns {[c.get('atTurn') for c in comps]}, e.g. {comps[0].get('before'):,}->{comps[0].get('after'):,})" if comps else " (the window never had to be trimmed)") | |
| + (f" · WARNING: {len(over)} request(s) reported occupancy ABOVE the window (turns {over}) — the source data or parse is suspect" if over else "") | |
| + ". This gauge is point-in-time; the cache-read total above is cumulative — they are different quantities and the cumulative one is expected to exceed the window.", | |
| ] | |
| # Always include the cited best-practice block (when any signal fired) so | |
| # "what could I have done better?" is answerable even when keyword scoring | |
| # wouldn't surface the relevant turns. | |
| bp_block = _best_practice_block(analysis) | |
| if bp_block: | |
| lines.append("\n" + bp_block) | |
| for i in top: | |
| t = turns[i] | |
| tools = t.get("tools", []) | |
| toolbits = [] | |
| for tc in tools[:14]: | |
| tag = tc.get("provenance", "direct") | |
| if tc.get("flowValue"): | |
| tag += f"<-{tc.get('sourceTool')}:{tc['flowValue']}" | |
| if tc.get("errored"): | |
| tag += ",ERRORED" | |
| toolbits.append(f"{tc.get('summary','')[:70]} [{tag}]") | |
| more = f" (+{len(tools)-14} more)" if len(tools) > 14 else "" | |
| guide = "" | |
| if t.get("guide"): | |
| guide = f" GUIDE[{t['guide'].get('head')}]: {t['guide'].get('body')}" | |
| lines.append( | |
| f"\nTURN {i} ({t.get('origin')}){' HEAVY' if t.get('heavy') else ''}: " | |
| f"prompt={t.get('prompt','')[:300]!r}\n" | |
| f" reply={t.get('reply','')[:240]!r}\n" | |
| f" tokens: cacheRead={t['tokens']['cacheRead']:,} out={t['tokens']['out']:,} " | |
| f"reqs={t.get('reqs')} · direct={t.get('direct')} indirect={t.get('indirect')}{guide}\n" | |
| f" tools: " + " | ".join(toolbits) + more | |
| ) | |
| return focus, sorted(set(top) | explicit & {t['i'] for t in turns}), "\n".join(lines) | |
| _CHAT_SYSTEM = ( | |
| "You are a forensic assistant for ONE coding-agent session (Claude Code). " | |
| "Answer ONLY from the TRACE CONTEXT provided — never invent files, tools, or " | |
| "numbers. Cite turns as 'turn N' using the turn numbers in the context. " | |
| "Numbers in the context are computed by a deterministic engine; quote them, " | |
| "do not recompute. Keep two quantities distinct and never conflate them: " | |
| "'cache re-reads' (and cost) are CUMULATIVE token sums across every round-trip " | |
| "and routinely reach the millions — they have no ceiling; the CONTEXT WINDOW " | |
| "gauge (peak fill / limit, e.g. 848k / 1M) is point-in-time and IS bounded by " | |
| "the window. A multi-million cache-read total does NOT mean the window overflowed. " | |
| "Only treat the window as over-full if the context explicitly flags a request above " | |
| "the limit. SUGGEST, never assert a fix ('looks like…', 'worth " | |
| "checking…', not 'the bug is X'). If the answer is not in the trace, say so " | |
| "plainly. Be concise: 2-4 sentences, plain English, no jargon dumps. " | |
| "If the user asks what they could have done better, use ONLY the items in the " | |
| "'WHAT COULD BE BETTER' block (each already carries the cited Anthropic best " | |
| "practice); cite the turn and phrase it as a gentle suggestion. Never introduce " | |
| "a best practice that is not in that block. If the block is absent, say the " | |
| "session looks clean and there's nothing notable to change." | |
| ) | |
| def _relevant_tool(turn: dict, qwords: set, err_intent: bool) -> int | None: | |
| """The single tool in a turn most relevant to the question — so a citation can | |
| land on the exact tool, not just the turn. Error-flavoured questions point at | |
| the first errored tool; otherwise the best keyword/flowValue overlap; else the | |
| first errored or first proven value-flow tool. Deterministic.""" | |
| tools = turn.get("tools", []) | |
| if not tools: | |
| return None | |
| if err_intent: | |
| for idx, tc in enumerate(tools): | |
| if tc.get("errored"): | |
| return idx | |
| best, best_score = None, 0 | |
| for idx, tc in enumerate(tools): | |
| blob = set(_words(" ".join([ | |
| tc.get("summary", ""), str(tc.get("flowValue") or ""), | |
| tc.get("name", ""), str(tc.get("sourceTool") or ""), | |
| ]))) | |
| score = len(qwords & blob) | |
| if score > best_score: | |
| best, best_score = idx, score | |
| if best is not None and best_score > 0: | |
| return best | |
| for idx, tc in enumerate(tools): | |
| if tc.get("errored"): | |
| return idx | |
| for idx, tc in enumerate(tools): | |
| if tc.get("provenance") == "indirect" and tc.get("flowValue"): | |
| return idx | |
| return None | |
| def _chip_label(turn: dict, tool_idx: int | None) -> str: | |
| """Friendly label for a citation chip: 'turn 5 · Bash ●err' / 'turn 9 · Read migrate.js'.""" | |
| i = turn["i"] | |
| if tool_idx is None: | |
| return f"turn {i}" | |
| tc = turn["tools"][tool_idx] | |
| name = f"{tc['mcp']['server']}:{tc['mcp']['tool']}" if tc.get("mcp") else tc.get("name", "tool") | |
| return f"turn {i} · {name}{' ●err' if tc.get('errored') else ''}" | |
| def _chat(question: str, path: Path) -> dict: | |
| analysis = _analyze_cached(path) | |
| turns = analysis["turns"] | |
| qwords = set(_words(question)) | |
| ql = (question or "").lower() | |
| err_intent = any(w in ql for w in ("error", "fail", "failed", "broke", "broken", "wrong", "stuck", "retry", "retries")) | |
| focus, cited, context = _retrieve(analysis, question) | |
| user = f"TRACE CONTEXT:\n{context}\n\nQUESTION: {question}\n\nAnswer from the trace above, citing turn numbers." | |
| model_used = None | |
| answer = None | |
| try: | |
| client = get_narrator() | |
| if client.wait_until_ready(max_wait=4.0, interval=1.0): | |
| model_used = client.model_id() | |
| answer = client.chat(_CHAT_SYSTEM, user, temperature=0.2, max_tokens=320) | |
| except Exception: | |
| answer = None | |
| if not answer: | |
| # Deterministic fallback so the feature works even with the model off. | |
| t = turns[focus] | |
| answer = ( | |
| f"(model offline — showing the trace) Turn {focus} is the most relevant: " | |
| f"{t.get('prompt','')[:120]}… It made {len(t.get('tools',[]))} tool calls, " | |
| f"{t.get('indirect')} of them agent-driven, with " | |
| f"{t['tokens']['cacheRead']:,} context re-read tokens" | |
| + (f". Tip: {t['guide'].get('body')}" if t.get('guide') else ".") | |
| ) | |
| # union any 'turn N' the model cited with the retrieval picks | |
| cited_set = set(cited) | |
| low = answer.lower().replace("#", " ").split() | |
| for i, tok in enumerate(low): | |
| if tok.startswith("turn") and i + 1 < len(low): | |
| num = "".join(c for c in low[i + 1] if c.isdigit()) | |
| if num != "" and 0 <= int(num) < len(turns): | |
| cited_set.add(int(num)) | |
| # per-citation tool targeting -> the chip opens the turn AND selects the tool | |
| focus_tool = _relevant_tool(turns[focus], qwords, err_intent) | |
| citations = [ | |
| {"turn": i, "tool": _relevant_tool(turns[i], qwords, err_intent), | |
| "label": _chip_label(turns[i], _relevant_tool(turns[i], qwords, err_intent))} | |
| for i in sorted(cited_set) | |
| ] | |
| return { | |
| "answer": answer, | |
| "focusTurn": focus, | |
| "focusTool": focus_tool, | |
| "citedTurns": sorted(cited_set), | |
| "citations": citations, | |
| "model": model_used, | |
| "grounded": True, | |
| } | |
| # --------------------------------------------------------------------------- # | |
| # HTTP handler | |
| # --------------------------------------------------------------------------- # | |
| _OVERVIEW_CACHE: dict[tuple[str, int], dict] = {} | |
| _OVERVIEW_SYSTEM = ( | |
| "You explain what happened in ONE coding-agent session, in plain English for a " | |
| "non-expert. Read the ordered turns and write 3-5 calm sentences: what the user " | |
| "was trying to do, what the agent actually did, and how it ended. Name a few " | |
| "turns as 'turn N'. If something looks like a problem, SUGGEST ('looks like…'), " | |
| "never assert a fix. Do NOT dwell on token counts or cost — focus on the work " | |
| "and the outcome. No drama, no marketing; just what happened." | |
| ) | |
| def _overview(analysis: dict) -> dict: | |
| """A plain-English 'what happened overall' for the session — narrator prose, the | |
| ONLY model call here. Grounded in the ordered turns (prompts + replies + flags).""" | |
| turns = analysis["turns"] | |
| sess = analysis["session"] | |
| lines = [ | |
| f"SESSION: cwd={sess.get('cwd')} · {sess.get('turns')} turns " | |
| f"({sess.get('humanTurns')} human, {sess.get('systemTurns')} system) · " | |
| f"{sess.get('tools')} tool calls · heavy turns {sess.get('heavyTurns')}." | |
| ] | |
| for t in turns: | |
| tl = t.get("tools", []) | |
| err = sum(1 for tc in tl if tc.get("errored")) | |
| flags = [] | |
| if t.get("heavy"): | |
| flags.append("heavy") | |
| if err: | |
| flags.append(f"{err} errored") | |
| if t.get("guide"): | |
| flags.append("flagged-" + str(t["guide"].get("kind"))) | |
| lines.append( | |
| f"turn {t['i']} ({t.get('origin')}): {(t.get('prompt') or '')[:220]!r} " | |
| f"=> reply {(t.get('reply') or '')[:170]!r} " | |
| f"[{', '.join(flags) or 'clean'}; {len(tl)} tools]" | |
| ) | |
| context = "\n".join(lines)[:6500] | |
| try: | |
| client = get_narrator() | |
| if client.wait_until_ready(max_wait=4.0, interval=1.0): | |
| text = client.chat( | |
| _OVERVIEW_SYSTEM, | |
| "SESSION TURNS:\n" + context + "\n\nWrite the plain-English overview now.", | |
| temperature=0.3, max_tokens=300, | |
| ) | |
| return {"overview": text.strip(), "model": client.model_id()} | |
| except Exception: | |
| pass | |
| return {"overview": "", "model": None} | |
| # --------------------------------------------------------------------------- # | |
| # WHAT COULD HAVE BEEN BETTER — the engine DETECTS the fixable signals (proven, | |
| # no model); the LOCAL model WRITES the advice, scoped to THIS session's objective | |
| # and grounded in the cited Anthropic best practice. Model-for-prose-only: the | |
| # finding is deterministic, only the wording is generated. Suggest, never assert. | |
| # Falls back to the engine's transcribed fix text when the model is unreachable. | |
| # --------------------------------------------------------------------------- # | |
| _ADVICE_CACHE: dict[tuple[str, int], dict] = {} | |
| _ADVICE_SYS = ( | |
| "You advise someone learning to drive a coding agent (Claude Code). A " | |
| "DETERMINISTIC engine already detected ONE specific, fixable pattern in THIS " | |
| "session — you do not decide whether it happened, you only explain it well. " | |
| "Using (a) what the user set out to do, (b) what actually happened on the cited " | |
| "turn(s), and (c) the relevant Anthropic best practice given to you, write 2-3 " | |
| "sentences of advice that is SCOPED TO THIS SESSION: refer to what they were " | |
| "actually doing, name the turn ('on turn 9…'), and suggest a concrete better " | |
| "move grounded in the Anthropic practice. RULES: SUGGEST, never assert " | |
| "('you could', 'it would have helped' — never 'you must' or 'the bug is'). Do " | |
| "NOT give generic advice — tie it to this session's work. Do NOT invent files, " | |
| "tools, or facts not in the context. Plain English, no jargon. Prose only." | |
| ) | |
| def _advice(analysis: dict) -> dict: | |
| """Per fired signal, ask the local model for session-scoped advice. Returns | |
| {recommendations:[{...rec, scoped}], model}. `scoped` is the model's prose, or | |
| None when the model is offline (the UI then falls back to the engine's cited | |
| fix text). The deterministic detection (which turns, which signal) is untouched.""" | |
| recs = analysis.get("recommendations", []) or [] | |
| if not recs: | |
| return {"recommendations": [], "model": None} | |
| turns = analysis.get("turns", []) | |
| humans = [t for t in turns if t.get("origin") == "human"] | |
| objective = ((humans[0]["prompt"] if humans else (turns[0]["prompt"] if turns else "")) or "")[:600] | |
| by_i = {t["i"]: t for t in turns} | |
| client = None | |
| try: | |
| c = get_narrator() | |
| if c.wait_until_ready(max_wait=4.0, interval=1.0): | |
| client = c | |
| except Exception: | |
| client = None | |
| model_used = client.model_id() if client else None | |
| out = [] | |
| for r in recs: | |
| ctx_lines = [] | |
| for i in r.get("turns", []): | |
| t = by_i.get(i) | |
| if not t: | |
| continue | |
| tl = t.get("tools", []) or [] | |
| err = sum(1 for tc in tl if tc.get("errored")) | |
| mix = ", ".join(f"{c2} {n}" for n, c2 in Counter(tc.get("name") for tc in tl).most_common(4)) | |
| ctx_lines.append( | |
| f"turn {i}: {((t.get('prompt') or '')[:160])!r} · ran {len(tl)} tools " | |
| f"({mix}){f', {err} errored' if err else ''}" | |
| ) | |
| user = ( | |
| f"SESSION OBJECTIVE (what the user set out to do):\n{objective}\n\n" | |
| f"WHAT HAPPENED ON THE FLAGGED TURN(S):\n" + "\n".join(ctx_lines) + | |
| f"\n\nDETECTED PATTERN (deterministic): {r.get('headline')} (signal: {r.get('kind')})\n" | |
| f"RELEVANT ANTHROPIC BEST PRACTICE: {r.get('practice')} — {r.get('advice')}\n\n" | |
| "Write the scoped suggestion now." | |
| ) | |
| scoped = None | |
| if client: | |
| try: | |
| txt = client.chat(_ADVICE_SYS, user, temperature=0.3, max_tokens=210) | |
| scoped = txt.strip() if txt else None | |
| except Exception: | |
| scoped = None | |
| out.append({**r, "scoped": scoped}) | |
| return {"recommendations": out, "model": model_used} | |
| # --------------------------------------------------------------------------- # | |
| # PROJECT level — many sessions under one cwd. A plain-English changelog, an | |
| # entity inventory (skills / sub-agents / MCP servers, traceable to sessions), | |
| # and a cross-session chat ("when did we add column X to sql?"). | |
| # --------------------------------------------------------------------------- # | |
| _BRIEF_CACHE: dict[tuple[str, int], dict] = {} | |
| _PROJECT_NARR_CACHE: dict[str, dict] = {} | |
| _PROJECT_CAP = 24 # parse at most the N most-recent sessions, for responsiveness | |
| def _brief(path: Path) -> dict: | |
| """Per-session facts via the LOADER only (no provenance, no model): counts, a | |
| title, named entities, and a search blob. Cached by mtime.""" | |
| key = (str(path), path.stat().st_mtime_ns) | |
| if key in _BRIEF_CACHE: | |
| return _BRIEF_CACHE[key] | |
| loaded = load(str(path)) | |
| turns = [to_jsonable(t) for t in loaded["turns"]] | |
| sess = loaded["session"] | |
| humans = [t for t in turns if t.get("origin") == "human"] | |
| title = humans[0]["prompt"] if humans else (turns[0]["prompt"] if turns else "(empty session)") | |
| title = " ".join(str(title).split())[:100] | |
| ents = extract_entities(turns) | |
| bins = extract_binaries(turns, load_registry()) | |
| imp = impact.detect_impact(turns, bins) | |
| parts = [] | |
| edited: list[str] = [] # distinct files this session CHANGED — the most distinctive | |
| seen_edit: set[str] = set() # cross-session signal, and what the changelog should report | |
| for t in turns: | |
| parts.append(t.get("prompt", "") or "") | |
| parts.append((t.get("reply", "") or "")[:200]) | |
| for tc in t.get("tools", []) or []: | |
| s = tc.get("summary", "") or "" | |
| parts.append(s) | |
| if tc.get("flowValue"): | |
| parts.append(str(tc["flowValue"])) | |
| # _summary() renders only Edit/Write as "Edit <basename>" (Read is "Read …"), | |
| # so this prefix uniquely captures files the session wrote, not files it read. | |
| if s.startswith("Edit "): | |
| fn = s[5:].strip() | |
| if fn and fn not in seen_edit: | |
| seen_edit.add(fn) | |
| edited.append(fn) | |
| # Anthropic cost (the ranking key) + cacheRead (kept as a secondary metric), via | |
| # the per-turn token rollup the loader already produced. Pure summation, no model. | |
| cost = sum((t.get("tokens", {}) or {}).get("cost", 0) for t in turns) | |
| cache_read = sum((t.get("tokens", {}) or {}).get("cacheRead", 0) for t in turns) | |
| generated = sum((t.get("tokens", {}) or {}).get("out", 0) for t in turns) | |
| brief = { | |
| "path": str(path), "sessionId": sess.get("sessionId"), | |
| "cwd": sess.get("cwd"), "gitBranch": sess.get("gitBranch"), | |
| "turns": len(turns), "humanTurns": len(humans), | |
| "tools": sum(len(t.get("tools", []) or []) for t in turns), | |
| "cost": cost, "cacheRead": cache_read, "generated": generated, | |
| "title": title, "firstPrompt": (humans[0]["prompt"][:300] if humans else ""), | |
| "mtime": int(path.stat().st_mtime), | |
| # real session start/end timestamps (from inside the file) so the project | |
| # view can show WHEN each session ran, not just a file-mtime "age". | |
| "startedAt": sess.get("startedAt"), "endedAt": sess.get("endedAt"), | |
| "entities": ents, "entityTotals": entity_totals(ents), | |
| "binaries": bins, | |
| "impact": imp, | |
| "editedFiles": edited[:10], | |
| "blob": " ".join(parts)[:9000], | |
| } | |
| _BRIEF_CACHE[key] = brief | |
| return brief | |
| def _project_sessions(cwd: str, projects_dir: str | None = None) -> list: | |
| target = discovery._norm(cwd) | |
| refs = [s for s in discovery.discover_sessions(projects_dir) if s.cwd == target] | |
| def _mt(s): | |
| try: | |
| return os.path.getmtime(s.path) | |
| except OSError: | |
| return 0 | |
| refs.sort(key=_mt, reverse=True) | |
| return refs | |
| def _aggregate_entities(briefs: list) -> dict: | |
| out = {"skills": {}, "subAgents": {}, "mcpServers": {}} | |
| for b in briefs: | |
| sid, path = b["sessionId"], b["path"] | |
| for kind in out: | |
| for e in b["entities"].get(kind, []): | |
| slot = out[kind].setdefault(e["name"], {"name": e["name"], "total": 0, "sessions": []}) | |
| slot["total"] += e["count"] | |
| slot["sessions"].append({ | |
| "sessionId": sid, "path": path, "count": e["count"], | |
| "turns": e.get("turns", []), "tools": e.get("tools"), | |
| }) | |
| return {k: sorted(v.values(), key=lambda x: (-x["total"], x["name"])) for k, v in out.items()} | |
| def _aggregate_binaries(briefs: list) -> list: | |
| """Roll every session's binaries up by name across the project, summing counts | |
| and recording which sessions/turns each appeared in (the cross-session | |
| traceback) — and carrying the registry metadata so the inventory shows the | |
| product name, blurb, logo and security note, not just the bare binary.""" | |
| out: dict = {} | |
| META = ("product", "blurb", "homepage", "logo", "security", "source", "updated") | |
| for b in briefs: | |
| sid, path = b["sessionId"], b["path"] | |
| for e in b.get("binaries", []) or []: | |
| slot = out.setdefault(e["name"], { | |
| "name": e["name"], "binary": e["name"], "total": 0, "sessions": [], | |
| "via": e.get("via"), "identified": bool(e.get("identified")), | |
| }) | |
| slot["total"] += e["count"] | |
| slot["sessions"].append({ | |
| "sessionId": sid, "path": path, "count": e["count"], | |
| "turns": e.get("turns", []), | |
| }) | |
| if e.get("identified"): # first identified session wins the display metadata | |
| slot["identified"] = True | |
| for k in META: | |
| if e.get(k) is not None and k not in slot: | |
| slot[k] = e[k] | |
| return sorted(out.values(), key=lambda x: (-x["total"], x["name"])) | |
| _RISK_RANK = {"None": 0, "Low": 1, "Medium": 2, "High": 3} | |
| _TAG_ORDER = {"PRODUCTION": 0, "SECURITY": 1, "NETWORK": 2, "CONFIG": 3} | |
| _PROJECT_ACTIONS_CACHE: dict = {} | |
| def _project_actions(cwd: str, projects_dir: str | None = None) -> dict: | |
| """Whole-project 'actions worth reviewing' — scanned across ALL sessions, not | |
| just the parse-capped subset the changelog uses. This is the safety lens, so it | |
| must be COMPLETE: a deploy or DB role change in any session must show, even one | |
| the changelog cap dropped. Cheap: it only regex-scans Bash command strings (no | |
| full parse, no model). Each action traces back to the sessions it happened in.""" | |
| target = discovery._norm(cwd) | |
| refs = [s for s in discovery.discover_sessions(projects_dir) if s.cwd == target] | |
| sig = tuple(sorted( | |
| (s.path, int(os.path.getmtime(s.path)) if os.path.exists(s.path) else 0) for s in refs | |
| )) | |
| key = (target, sig) | |
| if key in _PROJECT_ACTIONS_CACHE: | |
| return _PROJECT_ACTIONS_CACHE[key] | |
| agg: dict = {} | |
| for s in refs[:250]: # backstop on pathological project sizes | |
| sid = s.sessionId | |
| try: | |
| with open(s.path, "r", encoding="utf-8") as fh: | |
| for line in fh: | |
| if '"Bash"' not in line: | |
| continue | |
| try: | |
| r = json.loads(line) | |
| except (ValueError, json.JSONDecodeError): | |
| continue | |
| if r.get("type") != "assistant": | |
| continue | |
| for b in (r.get("message", {}) or {}).get("content", []) or []: | |
| if isinstance(b, dict) and b.get("type") == "tool_use" and b.get("name") == "Bash": | |
| cmd = str((b.get("input") or {}).get("command", "") or "") | |
| for tag, title, detail in impact._scan_command(cmd): | |
| slot = agg.setdefault((tag, title), { | |
| "tag": tag, "title": title, "detail": detail, | |
| "total": 0, "sessions": [], "_sids": set(), | |
| }) | |
| slot["total"] += 1 | |
| if sid not in slot["_sids"]: | |
| slot["_sids"].add(sid) | |
| slot["sessions"].append({"sessionId": sid, "path": s.path}) | |
| except OSError: | |
| continue | |
| actions = [] | |
| for a in agg.values(): | |
| a.pop("_sids", None) | |
| actions.append(a) | |
| actions.sort(key=lambda a: (impact._TAG_ORDER.get(a["tag"], 9), -a["total"], a["title"])) | |
| level, _reason = impact.risk_level(actions) | |
| result = {"riskLevel": level, "actions": actions} | |
| _PROJECT_ACTIONS_CACHE.clear() # one project at a time is plenty | |
| _PROJECT_ACTIONS_CACHE[key] = result | |
| return result | |
| def _aggregate_impact(briefs: list) -> dict: | |
| """Roll session impact up to the project: every 'action worth reviewing' across | |
| sessions (each traceable to the sessions/turns it happened in), and the highest | |
| risk level seen. Powers the project-level report's safety lens.""" | |
| actions: dict = {} | |
| level = "None" | |
| for b in briefs: | |
| imp = b.get("impact") or {} | |
| if _RISK_RANK.get(imp.get("riskLevel", "None"), 0) > _RISK_RANK.get(level, 0): | |
| level = imp.get("riskLevel", "None") | |
| for a in imp.get("actions", []) or []: | |
| slot = actions.setdefault((a["tag"], a["title"]), { | |
| "tag": a["tag"], "title": a["title"], "detail": a.get("detail", ""), | |
| "total": 0, "sessions": [], | |
| }) | |
| slot["total"] += 1 | |
| slot["sessions"].append({ | |
| "sessionId": b["sessionId"], "path": b["path"], "turns": a.get("turns", []), | |
| }) | |
| out = sorted( | |
| actions.values(), | |
| key=lambda a: (_TAG_ORDER.get(a["tag"], 9), -a["total"], a["title"]), | |
| ) | |
| return {"riskLevel": level, "actions": out} | |
| _PROJECT_NARR_SYSTEM = ( | |
| "You write a plain-English changelog of what happened across the coding-agent " | |
| "sessions in ONE project, for a non-expert. For each session (oldest first) you are " | |
| "given its short id and what it ACTUALLY DID — the files it changed, the actions it " | |
| "took, the tools / sub-agents / skills it used. Write flowing prose, no headers, no " | |
| "bullet list:\n" | |
| "- Open with one sentence naming what this project is and the through-line across " | |
| "the sessions.\n" | |
| "- Then describe the notable work. GROUP sessions that did the same kind of thing " | |
| "into one statement instead of repeating a line each. Cite sessions as [id].\n" | |
| "- Report what was BUILT or CHANGED (the files, the actions) — do NOT restate the " | |
| "request text. If many sessions show the SAME request (e.g. an automated security " | |
| "or PR-review pipeline), say that ONCE and focus on what differed, never echo it " | |
| "per session.\n" | |
| "Concrete and calm; suggest, don't assert. 4 to 8 sentences. Ground ONLY in what " | |
| "you are given — never invent files, tools, or features." | |
| ) | |
| # Auto-generated first prompts (a /security-review run, a slash-command preamble, a PR | |
| # template) repeat VERBATIM across sessions, so the bare first prompt is a useless, | |
| # identical "title" that makes the changelog parrot the same line N times (the screenshot | |
| # of "[id] Review this change for security vulnerabilities…" x16). Detect them so the | |
| # digest describes what the session DID rather than echoing the boilerplate ask. | |
| _BOILERPLATE_TITLE_RX = re.compile( | |
| r"review this change for security" | |
| r"|changed files \(you may read" | |
| r"|caveat: the messages below were generated" | |
| r"|opened (the |a )?pull request" | |
| r"|<command-(name|message|args)>" | |
| r"|^\s*/[a-z][\w-]*", | |
| re.I, | |
| ) | |
| def _session_digest(b: dict) -> str: | |
| """One DISTINCTIVE line per session for the changelog model: what it actually did | |
| (the request only if it's not boilerplate, plus impact actions, changed files, and | |
| named tools/agents/skills) — so the model has something to summarize beyond a first | |
| prompt that is identical across an automated-review project.""" | |
| sid = (b.get("sessionId") or "?")[:8] | |
| title = " ".join(str(b.get("title") or "").split()) | |
| bits: list[str] = [] | |
| if title and _BOILERPLATE_TITLE_RX.search(title): | |
| bits.append("automated security/PR-review run") | |
| elif title: | |
| bits.append(f"asked {title[:130]!r}") | |
| acts = [a["title"] for a in (b.get("impact") or {}).get("actions", [])[:3]] | |
| if acts: | |
| bits.append("did: " + "; ".join(acts)) | |
| edited = b.get("editedFiles") or [] | |
| if edited: | |
| more = f" +{len(edited) - 6} more" if len(edited) > 6 else "" | |
| bits.append(f"changed {', '.join(edited[:6])}{more}") | |
| used: list[str] = [] | |
| for kind, lbl in (("subAgents", "agents"), ("skills", "skills"), ("mcpServers", "mcp")): | |
| names = [e["name"] for e in b.get("entities", {}).get(kind, [])[:3]] | |
| if names: | |
| used.append(f"{lbl}:{','.join(names)}") | |
| tools = [x["name"] for x in (b.get("binaries") or [])[:3]] | |
| if tools: | |
| used.append("tools:" + ",".join(tools)) | |
| if used: | |
| bits.append(" · ".join(used)) | |
| body = " | ".join(bits) if bits else "(no notable activity)" | |
| return f"[{sid}] {b.get('turns', 0)} turns — {body}" | |
| # Detail at most this many sessions in the changelog context; the rest are summarized by | |
| # count so a big project can't overflow the model's output and get cut off mid-word. | |
| _NARR_DETAIL_CAP = 20 | |
| def _project_narrative(cwd: str, briefs: list) -> dict: | |
| mkey = "|".join(f"{b['sessionId']}:{b['mtime']}" for b in briefs) | |
| if mkey in _PROJECT_NARR_CACHE: | |
| return _PROJECT_NARR_CACHE[mkey] | |
| ordered = sorted(briefs, key=lambda b: b["mtime"]) | |
| lines = [f"PROJECT: {cwd} · {len(ordered)} session(s)."] | |
| for b in ordered[:_NARR_DETAIL_CAP]: | |
| lines.append(_session_digest(b)) | |
| if len(ordered) > _NARR_DETAIL_CAP: | |
| lines.append(f"(+{len(ordered) - _NARR_DETAIL_CAP} older session(s), similar — summarize by count)") | |
| context = "\n".join(lines)[:8000] | |
| result = {"narrative": "", "model": None} | |
| try: | |
| client = get_narrator() | |
| if client.wait_until_ready(max_wait=4.0, interval=1.0): | |
| txt = client.chat( | |
| _PROJECT_NARR_SYSTEM, | |
| "SESSIONS (oldest first):\n" + context + "\n\nWrite the changelog now.", | |
| temperature=0.3, max_tokens=700, | |
| ) | |
| result = {"narrative": txt.strip(), "model": client.model_id()} | |
| except Exception: | |
| pass | |
| _PROJECT_NARR_CACHE[mkey] = result | |
| return result | |
| def _project(cwd: str, with_narrative: bool = True, projects_dir: str | None = None) -> dict: | |
| refs = _project_sessions(cwd, projects_dir) | |
| briefs = [] | |
| for s in refs[:_PROJECT_CAP]: | |
| try: | |
| briefs.append(_brief(Path(s.path))) | |
| except Exception: | |
| continue | |
| # The narrative is the ONLY model call here. On the ZeroGPU Space it must be | |
| # invoked via the Gradio API (so auth headers forward for GPU quota), so the | |
| # plain-REST /api/project route passes with_narrative=False and the UI fetches | |
| # the prose separately through the `project_narrative` Gradio endpoint. | |
| narr = _project_narrative(cwd, briefs) if with_narrative else {"narrative": "", "model": None} | |
| # Sessions are RANKED BY COST (Anthropic token consumption) — what the user pays | |
| # for — not by recency. (Parsing is still capped by recency above; ordering is | |
| # cost.) Tie-break by mtime so equal-cost sessions stay stable. | |
| ranked = sorted(briefs, key=lambda b: (-b.get("cost", 0), -b.get("mtime", 0))) | |
| return { | |
| "cwd": cwd, "sessionCount": len(refs), "shown": len(briefs), | |
| "totalCost": sum(b.get("cost", 0) for b in briefs), | |
| "sessions": [{k: v for k, v in b.items() if k != "blob"} for b in ranked], | |
| "entities": _aggregate_entities(briefs), | |
| "binaries": _aggregate_binaries(briefs), | |
| # impact scans ALL sessions (not the parse-capped subset) — the safety lens | |
| # must be complete; an action in a dropped session must still show. | |
| "impact": _project_actions(cwd, projects_dir), | |
| "narrative": narr.get("narrative", ""), "model": narr.get("model"), | |
| } | |
| # Anti-fabrication clause appended to every project-chat system prompt — the model | |
| # may ONLY use facts present in the context (this is what stops it inventing a | |
| # "smruti-deploy image" or a column that isn't in the trace). | |
| _NO_INVENT = ( | |
| " Use ONLY facts shown in the context. NEVER invent file names, image names, " | |
| "commands, columns, tables, or features that are not present. If the context " | |
| "doesn't say, reply that it isn't clearly in these sessions." | |
| ) | |
| _PROJECT_OVERVIEW_SYSTEM = ( | |
| "You explain, for a non-expert, what a multi-session coding PROJECT is and what " | |
| "was built across it. Ground your answer ONLY in the project changelog and the " | |
| "session titles/entities given. Write 3-5 plain sentences: the project's purpose " | |
| "and the main things built or changed. You may cite a few sessions as [id]." | |
| + _NO_INVENT | |
| ) | |
| _PROJECT_LOOKUP_SYSTEM = ( | |
| "You locate WHICH session in a project something happened in. Given candidate " | |
| "sessions (short id, title, matched snippets), name the session(s) by short id " | |
| "[id] and say what happened there, quoting only what the snippets actually show. " | |
| "If nothing matches, say it isn't clearly in these sessions. SUGGEST, never " | |
| "assert. 2-4 sentences. Remind the user they can open a named session to go deeper." | |
| + _NO_INVENT | |
| ) | |
| # Phrases / shape that mark a BROAD "tell me about the whole project" question | |
| # (grounded on the full changelog) vs a SPECIFIC lookup (keyword-retrieved). | |
| _BROAD_HINTS = ( | |
| "what was built", "what did we build", "what is this project", "what's this project", | |
| "what is the project", "what was the project", "overall", "in general", "high level", | |
| "high-level", "summary", "summarize", "the gist", "purpose", "what happened in this project", | |
| "what are these sessions", "what was done", "tell me about the project", "what's the project", | |
| ) | |
| _BROAD_STOP = { | |
| "overall", "summary", "summarize", "built", "build", "building", "overview", | |
| "everything", "across", "project", "projects", "gist", "about", "point", "purpose", | |
| "goal", "goals", "session", "sessions", "these", "this", "general", "high", "level", | |
| "mean", "meant", "made", "thing", "things", "stuff", | |
| } | |
| def _is_broad(question: str, qwords: set, top_score: int) -> bool: | |
| ql = (question or "").lower() | |
| if any(h in ql for h in _BROAD_HINTS): | |
| return True | |
| content = [w for w in qwords if w not in _BROAD_STOP] | |
| return len(content) <= 1 or top_score <= 1 | |
| def _project_chat(question: str, cwd: str, projects_dir: str | None = None) -> dict: | |
| refs = _project_sessions(cwd, projects_dir) | |
| briefs = [] | |
| for s in refs[:_PROJECT_CAP]: | |
| try: | |
| briefs.append(_brief(Path(s.path))) | |
| except Exception: | |
| continue | |
| if not briefs: | |
| return {"answer": "No sessions found in this project.", "model": None, "sessionHits": []} | |
| qwords = set(_words(question)) | |
| scored = sorted( | |
| ((len(qwords & set(_words(b["title"] + " " + b["blob"]))), b) for b in briefs), | |
| key=lambda x: (-x[0], -x[1]["mtime"]), | |
| ) | |
| top_score = scored[0][0] if scored else 0 | |
| if _is_broad(question, qwords, top_score): | |
| # BROAD: ground on the whole project — the (already grounded) changelog plus | |
| # every session's title/entities. Synthesize; do not cherry-pick noisy hits. | |
| narr = _project_narrative(cwd, briefs).get("narrative", "") | |
| lines = [f"PROJECT CHANGELOG (grounded):\n{narr}", "", "ALL SESSIONS (most active first):"] | |
| for b in sorted(briefs, key=lambda b: -b["turns"]): | |
| ents = [] | |
| for kind in ("skills", "mcpServers", "subAgents"): | |
| ents += [e["name"] for e in b["entities"].get(kind, [])[:2]] | |
| lines.append( | |
| f"[{(b['sessionId'] or '?')[:8]}] {b['turns']} turns · {b['title']}" | |
| + (f" · uses {','.join(ents)}" if ents else "") | |
| ) | |
| context = "\n".join(lines)[:7200] | |
| system = _PROJECT_OVERVIEW_SYSTEM | |
| default_hits = sorted(briefs, key=lambda b: -b["turns"])[:4] | |
| else: | |
| # SPECIFIC: keyword-retrieved candidate sessions with matched snippets. | |
| hits0 = [b for sc, b in scored if sc > 0][:4] or [b for sc, b in scored][:2] | |
| lines = [] | |
| for b in hits0: | |
| low = b["blob"].lower() | |
| snip = [] | |
| for w in list(qwords)[:6]: | |
| idx = low.find(w) | |
| if idx >= 0: | |
| snip.append(b["blob"][max(0, idx - 50):idx + 70].replace("\n", " ")) | |
| lines.append(f"[{(b['sessionId'] or '?')[:8]}] ({b['turns']} turns) title={b['title']!r} snippets={' … '.join(snip[:3])!r}") | |
| context = "\n".join(lines)[:6500] | |
| system = _PROJECT_LOOKUP_SYSTEM | |
| default_hits = hits0 | |
| answer, model_used = None, None | |
| try: | |
| client = get_narrator() | |
| if client.wait_until_ready(max_wait=4.0, interval=1.0): | |
| model_used = client.model_id() | |
| answer = client.chat(system, "CONTEXT:\n" + context + f"\n\nQUESTION: {question}", temperature=0.1, max_tokens=320) | |
| except Exception: | |
| answer = None | |
| if not answer: | |
| b = default_hits[0] | |
| answer = f"(model offline) Closest match: session [{(b['sessionId'] or '?')[:8]}] — {b['title']}. Open it to go deeper." | |
| # chips = the sessions the answer actually cited (by short id), then the defaults | |
| by_short = {(b["sessionId"] or "")[:8]: b for b in briefs if b.get("sessionId")} | |
| cited = [] | |
| for tok in re.findall(r"\[([0-9a-fA-F]{6,8})\]", answer): | |
| b = by_short.get(tok.lower()[:8]) | |
| if b is not None and b not in cited: | |
| cited.append(b) | |
| hits = (cited + [b for b in default_hits if b not in cited])[:5] | |
| return { | |
| "answer": answer, "model": model_used, | |
| "sessionHits": [{"sessionId": b["sessionId"], "path": b["path"], "title": b["title"], "turns": b["turns"]} for b in hits], | |
| } | |
| class Handler(BaseHTTPRequestHandler): | |
| server_version = "her/1.0" | |
| def _send(self, code: int, body: bytes, ctype: str): | |
| self.send_response(code) | |
| self.send_header("Content-Type", ctype) | |
| self.send_header("Content-Length", str(len(body))) | |
| self.send_header("Cache-Control", "no-store") | |
| self.end_headers() | |
| try: | |
| self.wfile.write(body) | |
| except (BrokenPipeError, ConnectionResetError): | |
| pass | |
| def _json(self, obj, code: int = 200): | |
| self._send(code, json.dumps(obj, ensure_ascii=False).encode("utf-8"), "application/json") | |
| def log_message(self, *args): # quiet; this is a local tool | |
| pass | |
| # -- GET: api + static -------------------------------------------------- # | |
| def do_GET(self): | |
| u = urllib.parse.urlparse(self.path) | |
| q = urllib.parse.parse_qs(u.query) | |
| if u.path == "/api/health": | |
| llama = False | |
| try: | |
| llama = get_narrator().wait_until_ready(max_wait=0.1, interval=0.1) | |
| except Exception: | |
| llama = False | |
| return self._json({"ok": True, "llama": llama}) | |
| if u.path == "/api/consent": | |
| return self._json(_CONSENT) | |
| if u.path == "/api/sessions": | |
| try: | |
| return self._json(_sessions_payload()) | |
| except Exception as e: # never 500 the browser | |
| return self._json({"error": str(e), "projects": [], "total": 0}, 200) | |
| if u.path == "/api/analyze": | |
| path = _safe_session_path((q.get("path") or [None])[0]) | |
| if path is None: | |
| return self._json({"error": "path not allowed"}, 400) | |
| try: | |
| return self._json(_analyze_cached(path)) | |
| except Exception as e: | |
| return self._json({"error": f"analyze failed: {e}"}, 500) | |
| if u.path == "/api/overview": | |
| path = _safe_session_path((q.get("path") or [None])[0]) | |
| if path is None: | |
| return self._json({"error": "path not allowed"}, 400) | |
| try: | |
| key = (str(path), path.stat().st_mtime_ns) | |
| if key not in _OVERVIEW_CACHE: | |
| _OVERVIEW_CACHE.clear() | |
| _OVERVIEW_CACHE[key] = _overview(_analyze_cached(path)) | |
| return self._json(_OVERVIEW_CACHE[key]) | |
| except Exception as e: | |
| return self._json({"overview": "", "error": str(e)}, 200) | |
| if u.path == "/api/advice": | |
| path = _safe_session_path((q.get("path") or [None])[0]) | |
| if path is None: | |
| return self._json({"error": "path not allowed"}, 400) | |
| try: | |
| key = (str(path), path.stat().st_mtime_ns) | |
| cached = _ADVICE_CACHE.get(key) | |
| if cached is None: | |
| result = _advice(_analyze_cached(path)) | |
| # Only cache once the model actually wrote prose, so an offline | |
| # warm-up doesn't freeze the deterministic fallback in place. | |
| if result.get("model"): | |
| _ADVICE_CACHE.clear() | |
| _ADVICE_CACHE[key] = result | |
| cached = result | |
| return self._json(cached) | |
| except Exception as e: | |
| return self._json({"recommendations": [], "model": None, "error": str(e)}, 200) | |
| if u.path == "/api/project": | |
| cwd = (q.get("cwd") or [""])[0] | |
| if not cwd: | |
| return self._json({"error": "cwd required"}, 400) | |
| try: | |
| return self._json(_project(cwd)) | |
| except Exception as e: | |
| return self._json({"error": f"project failed: {e}"}, 500) | |
| return self._serve_static(u.path) | |
| # -- POST: chat --------------------------------------------------------- # | |
| def do_POST(self): | |
| u = urllib.parse.urlparse(self.path) | |
| if u.path not in ("/api/chat", "/api/project_chat", "/api/consent"): | |
| return self._json({"error": "not found"}, 404) | |
| try: | |
| n = int(self.headers.get("Content-Length", "0")) | |
| body = json.loads(self.rfile.read(n) or "{}") | |
| except (ValueError, json.JSONDecodeError): | |
| return self._json({"error": "bad json"}, 400) | |
| # first-run disclaimer choice: {accepted, share}. Persisted; gates sharing. | |
| if u.path == "/api/consent": | |
| _save_consent(bool(body.get("accepted", True)), bool(body.get("share", True))) | |
| return self._json(_CONSENT) | |
| question = (body.get("question") or "").strip() | |
| if not question: | |
| return self._json({"error": "empty question"}, 400) | |
| if u.path == "/api/project_chat": | |
| cwd = (body.get("cwd") or "").strip() | |
| if not cwd: | |
| return self._json({"error": "cwd required"}, 400) | |
| try: | |
| return self._json(_project_chat(question, cwd)) | |
| except Exception as e: | |
| return self._json({"error": f"project chat failed: {e}"}, 500) | |
| path = _safe_session_path(body.get("path")) | |
| if path is None: | |
| return self._json({"error": "path not allowed"}, 400) | |
| try: | |
| return self._json(_chat(question, path)) | |
| except Exception as e: | |
| return self._json({"error": f"chat failed: {e}"}, 500) | |
| # -- static file serving (the built UI) --------------------------------- # | |
| def _serve_static(self, path: str): | |
| rel = path.lstrip("/") or "index.html" | |
| for root in (DIST, PUBLIC): | |
| cand = (root / rel).resolve() | |
| if str(cand).startswith(str(root.resolve())) and cand.is_file(): | |
| return self._send(200, cand.read_bytes(), _ctype(cand)) | |
| # SPA fallback | |
| idx = DIST / "index.html" | |
| if idx.is_file(): | |
| return self._send(200, idx.read_bytes(), "text/html") | |
| return self._send( | |
| 404, | |
| b"UI not built. Run: cd ui && npm run build (or use vite dev on :5173)", | |
| "text/plain", | |
| ) | |
| def _ctype(p: Path) -> str: | |
| return { | |
| ".html": "text/html", ".js": "text/javascript", ".css": "text/css", | |
| ".json": "application/json", ".svg": "image/svg+xml", ".png": "image/png", | |
| ".ico": "image/x-icon", ".woff2": "font/woff2", ".woff": "font/woff", | |
| }.get(p.suffix, "application/octet-stream") | |
| def main(): | |
| httpd = ThreadingHTTPServer((HOST, PORT), Handler) | |
| print(f"Her · हेर — server on http://{HOST}:{PORT} (UI + /api, 100% local)") | |
| print(f" dist: {DIST} ({'built' if (DIST/'index.html').exists() else 'NOT built — run npm run build'})") | |
| if os.environ.get("HER_ENRICH") == "0": | |
| print(" enricher: OFF (HER_ENRICH=0)") | |
| else: | |
| print(" enricher: passive background (bare binary names -> npm/brew/pypi; HER_ENRICH=0 to disable)") | |
| _start_enricher() | |
| try: | |
| httpd.serve_forever() | |
| except KeyboardInterrupt: | |
| httpd.shutdown() | |
| if __name__ == "__main__": | |
| main() | |