Spaces:
Running on Zero
Running on Zero
| """provenance.py — direct vs indirect VALUE-FLOW, with DISTINCTIVE-value matching. | |
| NON-NEGOTIABLE #4: a call is `indirect` (PROVEN value-flow) ONLY when a distinctive | |
| value from its input appears VERBATIM in an EARLIER tool_result text. Temporal | |
| proximity is NOT used here — proximity is a hypothesis the narrator may raise, never | |
| a proven edge. | |
| PRECISION IS THE RELEASE GATE. The POC over-fired because it matched the shared | |
| cwd/repo path (which appears in nearly every tool input) and called everything | |
| indirect. Guards, in order: | |
| 1. Strip the common cwd/repo prefix before judging length/distinctiveness. | |
| 2. DROP any candidate value shared across more than `shared_tool_cap` input tools | |
| (default 10) — config knob. The cwd prefix and ubiquitous tokens die here. | |
| 3. Bare identifiers must be >= `ident_min` chars (default 12). Paths are exempt | |
| (they are matched verbatim, by stripped-relative form, or by a distinctive | |
| basename >= `basename_min` chars). | |
| 4. Never let a value that *equals* the cwd (or its rstripped form) match. | |
| 5. Candidates are searched in priority order: URLs, then UUIDs, then bare | |
| identifiers, then paths — so the recorded flowValue prefers the most | |
| distinctive evidence (a hash/id/URL over a bare directory). | |
| All thresholds are config knobs on `ProvenanceConfig`. Strictness up = fewer, | |
| crisper indirect edges. Defaults are calibrated so the fixture lands at the | |
| verified ~82% (0.78-0.86) agent-driven ratio without manufacturing false edges. | |
| Pure code. NO model. | |
| """ | |
| from __future__ import annotations | |
| import os | |
| import re | |
| from dataclasses import dataclass | |
| from typing import Any, Optional | |
| # --------------------------------------------------------------------------- # | |
| # candidate extraction | |
| # --------------------------------------------------------------------------- # | |
| _URL_RE = re.compile(r"https?://[^\s\"'<>)\]}]+") | |
| _UUID_RE = re.compile( | |
| r"\b[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}\b", re.I | |
| ) | |
| # absolute / slash-bearing path-like tokens | |
| _PATH_RE = re.compile(r"/[A-Za-z0-9_./\-]+") | |
| # bare identifiers >= 12 chars (env var names, service names, hashes, slugs) | |
| _IDENT_RE = re.compile(r"[A-Za-z0-9_./\-]{12,}") | |
| class ProvenanceConfig: | |
| """All strictness knobs. Raise to be stricter (fewer indirect edges).""" | |
| shared_tool_cap: int = 10 # drop values appearing in > this many input tools | |
| ident_min: int = 12 # min length for a bare (non-path) identifier candidate | |
| basename_min: int = 8 # min length for a path's basename to count as a match | |
| rel_path_min: int = 8 # min length for a stripped-relative path to count | |
| def _is_url(v: str) -> bool: | |
| return _URL_RE.fullmatch(v) is not None | |
| def _is_uuid(v: str) -> bool: | |
| return _UUID_RE.fullmatch(v) is not None | |
| def _input_text(inp: Any) -> str: | |
| """Flatten a tool input's string/number fields into one searchable string.""" | |
| if isinstance(inp, dict): | |
| parts: list[str] = [] | |
| for v in inp.values(): | |
| if isinstance(v, str): | |
| parts.append(v) | |
| elif isinstance(v, (int, float)): | |
| parts.append(str(v)) | |
| return " ".join(parts) | |
| if inp is None: | |
| return "" | |
| return str(inp) | |
| def _candidates(inp: Any) -> set[str]: | |
| """Distinctive candidate values from a tool input: URLs, UUIDs, paths, idents.""" | |
| s = _input_text(inp) | |
| cset: set[str] = set() | |
| cset |= set(_URL_RE.findall(s)) | |
| cset |= set(_UUID_RE.findall(s)) | |
| for p in _PATH_RE.findall(s): | |
| cset.add(p) | |
| for m in _IDENT_RE.findall(s): | |
| cset.add(m) | |
| return cset | |
| def _priority(c: str) -> int: | |
| """Search order: most distinctive first → flowValue prefers strong evidence.""" | |
| if _is_url(c): | |
| return 0 | |
| if _is_uuid(c): | |
| return 1 | |
| if "/" in c: | |
| return 3 # bare path last (least distinctive of the set) | |
| return 2 # bare identifier | |
| def _strip_prefix(v: str, cwd: Optional[str]) -> str: | |
| if cwd and v.startswith(cwd): | |
| return v[len(cwd):].lstrip("/") | |
| return v | |
| # --------------------------------------------------------------------------- # | |
| # the analysis | |
| # --------------------------------------------------------------------------- # | |
| def annotate_provenance( | |
| turns, | |
| session: Optional[dict] = None, | |
| config: Optional[ProvenanceConfig] = None, | |
| ) -> None: | |
| """Set provenance / sourceTool / flowValue on every ToolCall, and | |
| Turn.direct / Turn.indirect counts. In place. Pure, deterministic. | |
| Walks tools in chronological (turn, then within-turn) order. For each tool, | |
| searches the running history of EARLIER tool_result texts for a distinctive | |
| candidate from this tool's input. First distinctive verbatim hit → indirect. | |
| """ | |
| cfg = config or ProvenanceConfig() | |
| cwd = (session or {}).get("cwd") if session else None | |
| # chronological flat list of (turn, ToolCall) | |
| flat: list[tuple[Any, Any]] = [] | |
| for t in turns: | |
| for tc in t.tools: | |
| flat.append((t, tc)) | |
| # how many distinct INPUT tools each candidate value appears in → shared cap | |
| val_tool_idx: dict[str, set[int]] = {} | |
| for gi, (_t, tc) in enumerate(flat): | |
| for c in _candidates(tc.input): | |
| val_tool_idx.setdefault(c, set()).add(gi) | |
| cwd_rstrip = cwd.rstrip("/") if cwd else None | |
| def _distinctive(c: str) -> bool: | |
| # never the bare cwd itself | |
| if cwd and (c == cwd or c == cwd_rstrip): | |
| return False | |
| # drop values shared across too many input tools (kills the cwd prefix | |
| # and other ubiquitous tokens — the POC's false-positive engine) | |
| if len(val_tool_idx.get(c, ())) > cfg.shared_tool_cap: | |
| return False | |
| # bare identifiers must clear the length floor; paths are exempt (matched | |
| # by verbatim / stripped-relative / distinctive basename below) | |
| if "/" not in c and len(c) < cfg.ident_min: | |
| return False | |
| return True | |
| # running history of earlier results: list of (global_idx, text) | |
| history: list[tuple[int, str]] = [] | |
| for gi, (turn, tc) in enumerate(flat): | |
| found_value: Optional[str] = None | |
| source_idx: Optional[int] = None | |
| for c in sorted(_candidates(tc.input), key=_priority): | |
| if not _distinctive(c): | |
| continue | |
| is_path = "/" in c | |
| rel = _strip_prefix(c, cwd) | |
| for pidx, ptext in history: | |
| if not ptext: | |
| continue | |
| # 1) verbatim full candidate in an earlier result → strongest | |
| if c in ptext: | |
| found_value, source_idx = c, pidx | |
| break | |
| if is_path: | |
| # 2) stripped-relative path verbatim (handles SRC vs PROD, | |
| # /tmp paths printed then reused, etc.) | |
| if len(rel) >= cfg.rel_path_min and rel in ptext: | |
| found_value, source_idx = c, pidx | |
| break | |
| # 3) distinctive basename (e.g. apply.js, migrate.js) printed | |
| # in an earlier result then opened/run later | |
| bn = os.path.basename(c) | |
| if len(bn) >= cfg.basename_min and bn in ptext: | |
| found_value, source_idx = bn, pidx | |
| break | |
| if found_value is not None: | |
| break | |
| if found_value is not None: | |
| tc.provenance = "indirect" | |
| tc.flowValue = found_value | |
| tc.sourceTool = flat[source_idx][1].name if source_idx is not None else None | |
| else: | |
| tc.provenance = "direct" | |
| tc.flowValue = None | |
| tc.sourceTool = None | |
| # append THIS tool's result to history (only earlier results are visible | |
| # to later tools — strict causal ordering) | |
| if tc.result_text: | |
| history.append((gi, tc.result_text)) | |
| # per-turn direct / indirect counts | |
| for t in turns: | |
| t.direct = sum(1 for tc in t.tools if tc.provenance == "direct") | |
| t.indirect = sum(1 for tc in t.tools if tc.provenance == "indirect") | |