"""provenance.py — direct vs indirect VALUE-FLOW, with DISTINCTIVE-value matching. NON-NEGOTIABLE #4: a call is `indirect` (PROVEN value-flow) ONLY when a distinctive value from its input appears VERBATIM in an EARLIER tool_result text. Temporal proximity is NOT used here — proximity is a hypothesis the narrator may raise, never a proven edge. PRECISION IS THE RELEASE GATE. The POC over-fired because it matched the shared cwd/repo path (which appears in nearly every tool input) and called everything indirect. Guards, in order: 1. Strip the common cwd/repo prefix before judging length/distinctiveness. 2. DROP any candidate value shared across more than `shared_tool_cap` input tools (default 10) — config knob. The cwd prefix and ubiquitous tokens die here. 3. Bare identifiers must be >= `ident_min` chars (default 12). Paths are exempt (they are matched verbatim, by stripped-relative form, or by a distinctive basename >= `basename_min` chars). 4. Never let a value that *equals* the cwd (or its rstripped form) match. 5. Candidates are searched in priority order: URLs, then UUIDs, then bare identifiers, then paths — so the recorded flowValue prefers the most distinctive evidence (a hash/id/URL over a bare directory). All thresholds are config knobs on `ProvenanceConfig`. Strictness up = fewer, crisper indirect edges. Defaults are calibrated so the fixture lands at the verified ~82% (0.78-0.86) agent-driven ratio without manufacturing false edges. Pure code. NO model. """ from __future__ import annotations import os import re from dataclasses import dataclass from typing import Any, Optional # --------------------------------------------------------------------------- # # candidate extraction # --------------------------------------------------------------------------- # _URL_RE = re.compile(r"https?://[^\s\"'<>)\]}]+") _UUID_RE = re.compile( r"\b[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}\b", re.I ) # absolute / slash-bearing path-like tokens _PATH_RE = re.compile(r"/[A-Za-z0-9_./\-]+") # bare identifiers >= 12 chars (env var names, service names, hashes, slugs) _IDENT_RE = re.compile(r"[A-Za-z0-9_./\-]{12,}") @dataclass class ProvenanceConfig: """All strictness knobs. Raise to be stricter (fewer indirect edges).""" shared_tool_cap: int = 10 # drop values appearing in > this many input tools ident_min: int = 12 # min length for a bare (non-path) identifier candidate basename_min: int = 8 # min length for a path's basename to count as a match rel_path_min: int = 8 # min length for a stripped-relative path to count def _is_url(v: str) -> bool: return _URL_RE.fullmatch(v) is not None def _is_uuid(v: str) -> bool: return _UUID_RE.fullmatch(v) is not None def _input_text(inp: Any) -> str: """Flatten a tool input's string/number fields into one searchable string.""" if isinstance(inp, dict): parts: list[str] = [] for v in inp.values(): if isinstance(v, str): parts.append(v) elif isinstance(v, (int, float)): parts.append(str(v)) return " ".join(parts) if inp is None: return "" return str(inp) def _candidates(inp: Any) -> set[str]: """Distinctive candidate values from a tool input: URLs, UUIDs, paths, idents.""" s = _input_text(inp) cset: set[str] = set() cset |= set(_URL_RE.findall(s)) cset |= set(_UUID_RE.findall(s)) for p in _PATH_RE.findall(s): cset.add(p) for m in _IDENT_RE.findall(s): cset.add(m) return cset def _priority(c: str) -> int: """Search order: most distinctive first → flowValue prefers strong evidence.""" if _is_url(c): return 0 if _is_uuid(c): return 1 if "/" in c: return 3 # bare path last (least distinctive of the set) return 2 # bare identifier def _strip_prefix(v: str, cwd: Optional[str]) -> str: if cwd and v.startswith(cwd): return v[len(cwd):].lstrip("/") return v # --------------------------------------------------------------------------- # # the analysis # --------------------------------------------------------------------------- # def annotate_provenance( turns, session: Optional[dict] = None, config: Optional[ProvenanceConfig] = None, ) -> None: """Set provenance / sourceTool / flowValue on every ToolCall, and Turn.direct / Turn.indirect counts. In place. Pure, deterministic. Walks tools in chronological (turn, then within-turn) order. For each tool, searches the running history of EARLIER tool_result texts for a distinctive candidate from this tool's input. First distinctive verbatim hit → indirect. """ cfg = config or ProvenanceConfig() cwd = (session or {}).get("cwd") if session else None # chronological flat list of (turn, ToolCall) flat: list[tuple[Any, Any]] = [] for t in turns: for tc in t.tools: flat.append((t, tc)) # how many distinct INPUT tools each candidate value appears in → shared cap val_tool_idx: dict[str, set[int]] = {} for gi, (_t, tc) in enumerate(flat): for c in _candidates(tc.input): val_tool_idx.setdefault(c, set()).add(gi) cwd_rstrip = cwd.rstrip("/") if cwd else None def _distinctive(c: str) -> bool: # never the bare cwd itself if cwd and (c == cwd or c == cwd_rstrip): return False # drop values shared across too many input tools (kills the cwd prefix # and other ubiquitous tokens — the POC's false-positive engine) if len(val_tool_idx.get(c, ())) > cfg.shared_tool_cap: return False # bare identifiers must clear the length floor; paths are exempt (matched # by verbatim / stripped-relative / distinctive basename below) if "/" not in c and len(c) < cfg.ident_min: return False return True # running history of earlier results: list of (global_idx, text) history: list[tuple[int, str]] = [] for gi, (turn, tc) in enumerate(flat): found_value: Optional[str] = None source_idx: Optional[int] = None for c in sorted(_candidates(tc.input), key=_priority): if not _distinctive(c): continue is_path = "/" in c rel = _strip_prefix(c, cwd) for pidx, ptext in history: if not ptext: continue # 1) verbatim full candidate in an earlier result → strongest if c in ptext: found_value, source_idx = c, pidx break if is_path: # 2) stripped-relative path verbatim (handles SRC vs PROD, # /tmp paths printed then reused, etc.) if len(rel) >= cfg.rel_path_min and rel in ptext: found_value, source_idx = c, pidx break # 3) distinctive basename (e.g. apply.js, migrate.js) printed # in an earlier result then opened/run later bn = os.path.basename(c) if len(bn) >= cfg.basename_min and bn in ptext: found_value, source_idx = bn, pidx break if found_value is not None: break if found_value is not None: tc.provenance = "indirect" tc.flowValue = found_value tc.sourceTool = flat[source_idx][1].name if source_idx is not None else None else: tc.provenance = "direct" tc.flowValue = None tc.sourceTool = None # append THIS tool's result to history (only earlier results are visible # to later tools — strict causal ordering) if tc.result_text: history.append((gi, tc.result_text)) # per-turn direct / indirect counts for t in turns: t.direct = sum(1 for tc in t.tools if tc.provenance == "direct") t.indirect = sum(1 for tc in t.tools if tc.provenance == "indirect")