her / engine /core /provenance.py
geekwrestler's picture
Squash history (purge pre-scrub demo session blobs)
5f43c7d
"""provenance.py — direct vs indirect VALUE-FLOW, with DISTINCTIVE-value matching.
NON-NEGOTIABLE #4: a call is `indirect` (PROVEN value-flow) ONLY when a distinctive
value from its input appears VERBATIM in an EARLIER tool_result text. Temporal
proximity is NOT used here — proximity is a hypothesis the narrator may raise, never
a proven edge.
PRECISION IS THE RELEASE GATE. The POC over-fired because it matched the shared
cwd/repo path (which appears in nearly every tool input) and called everything
indirect. Guards, in order:
1. Strip the common cwd/repo prefix before judging length/distinctiveness.
2. DROP any candidate value shared across more than `shared_tool_cap` input tools
(default 10) — config knob. The cwd prefix and ubiquitous tokens die here.
3. Bare identifiers must be >= `ident_min` chars (default 12). Paths are exempt
(they are matched verbatim, by stripped-relative form, or by a distinctive
basename >= `basename_min` chars).
4. Never let a value that *equals* the cwd (or its rstripped form) match.
5. Candidates are searched in priority order: URLs, then UUIDs, then bare
identifiers, then paths — so the recorded flowValue prefers the most
distinctive evidence (a hash/id/URL over a bare directory).
All thresholds are config knobs on `ProvenanceConfig`. Strictness up = fewer,
crisper indirect edges. Defaults are calibrated so the fixture lands at the
verified ~82% (0.78-0.86) agent-driven ratio without manufacturing false edges.
Pure code. NO model.
"""
from __future__ import annotations
import os
import re
from dataclasses import dataclass
from typing import Any, Optional
# --------------------------------------------------------------------------- #
# candidate extraction
# --------------------------------------------------------------------------- #
_URL_RE = re.compile(r"https?://[^\s\"'<>)\]}]+")
_UUID_RE = re.compile(
r"\b[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}\b", re.I
)
# absolute / slash-bearing path-like tokens
_PATH_RE = re.compile(r"/[A-Za-z0-9_./\-]+")
# bare identifiers >= 12 chars (env var names, service names, hashes, slugs)
_IDENT_RE = re.compile(r"[A-Za-z0-9_./\-]{12,}")
@dataclass
class ProvenanceConfig:
"""All strictness knobs. Raise to be stricter (fewer indirect edges)."""
shared_tool_cap: int = 10 # drop values appearing in > this many input tools
ident_min: int = 12 # min length for a bare (non-path) identifier candidate
basename_min: int = 8 # min length for a path's basename to count as a match
rel_path_min: int = 8 # min length for a stripped-relative path to count
def _is_url(v: str) -> bool:
return _URL_RE.fullmatch(v) is not None
def _is_uuid(v: str) -> bool:
return _UUID_RE.fullmatch(v) is not None
def _input_text(inp: Any) -> str:
"""Flatten a tool input's string/number fields into one searchable string."""
if isinstance(inp, dict):
parts: list[str] = []
for v in inp.values():
if isinstance(v, str):
parts.append(v)
elif isinstance(v, (int, float)):
parts.append(str(v))
return " ".join(parts)
if inp is None:
return ""
return str(inp)
def _candidates(inp: Any) -> set[str]:
"""Distinctive candidate values from a tool input: URLs, UUIDs, paths, idents."""
s = _input_text(inp)
cset: set[str] = set()
cset |= set(_URL_RE.findall(s))
cset |= set(_UUID_RE.findall(s))
for p in _PATH_RE.findall(s):
cset.add(p)
for m in _IDENT_RE.findall(s):
cset.add(m)
return cset
def _priority(c: str) -> int:
"""Search order: most distinctive first → flowValue prefers strong evidence."""
if _is_url(c):
return 0
if _is_uuid(c):
return 1
if "/" in c:
return 3 # bare path last (least distinctive of the set)
return 2 # bare identifier
def _strip_prefix(v: str, cwd: Optional[str]) -> str:
if cwd and v.startswith(cwd):
return v[len(cwd):].lstrip("/")
return v
# --------------------------------------------------------------------------- #
# the analysis
# --------------------------------------------------------------------------- #
def annotate_provenance(
turns,
session: Optional[dict] = None,
config: Optional[ProvenanceConfig] = None,
) -> None:
"""Set provenance / sourceTool / flowValue on every ToolCall, and
Turn.direct / Turn.indirect counts. In place. Pure, deterministic.
Walks tools in chronological (turn, then within-turn) order. For each tool,
searches the running history of EARLIER tool_result texts for a distinctive
candidate from this tool's input. First distinctive verbatim hit → indirect.
"""
cfg = config or ProvenanceConfig()
cwd = (session or {}).get("cwd") if session else None
# chronological flat list of (turn, ToolCall)
flat: list[tuple[Any, Any]] = []
for t in turns:
for tc in t.tools:
flat.append((t, tc))
# how many distinct INPUT tools each candidate value appears in → shared cap
val_tool_idx: dict[str, set[int]] = {}
for gi, (_t, tc) in enumerate(flat):
for c in _candidates(tc.input):
val_tool_idx.setdefault(c, set()).add(gi)
cwd_rstrip = cwd.rstrip("/") if cwd else None
def _distinctive(c: str) -> bool:
# never the bare cwd itself
if cwd and (c == cwd or c == cwd_rstrip):
return False
# drop values shared across too many input tools (kills the cwd prefix
# and other ubiquitous tokens — the POC's false-positive engine)
if len(val_tool_idx.get(c, ())) > cfg.shared_tool_cap:
return False
# bare identifiers must clear the length floor; paths are exempt (matched
# by verbatim / stripped-relative / distinctive basename below)
if "/" not in c and len(c) < cfg.ident_min:
return False
return True
# running history of earlier results: list of (global_idx, text)
history: list[tuple[int, str]] = []
for gi, (turn, tc) in enumerate(flat):
found_value: Optional[str] = None
source_idx: Optional[int] = None
for c in sorted(_candidates(tc.input), key=_priority):
if not _distinctive(c):
continue
is_path = "/" in c
rel = _strip_prefix(c, cwd)
for pidx, ptext in history:
if not ptext:
continue
# 1) verbatim full candidate in an earlier result → strongest
if c in ptext:
found_value, source_idx = c, pidx
break
if is_path:
# 2) stripped-relative path verbatim (handles SRC vs PROD,
# /tmp paths printed then reused, etc.)
if len(rel) >= cfg.rel_path_min and rel in ptext:
found_value, source_idx = c, pidx
break
# 3) distinctive basename (e.g. apply.js, migrate.js) printed
# in an earlier result then opened/run later
bn = os.path.basename(c)
if len(bn) >= cfg.basename_min and bn in ptext:
found_value, source_idx = bn, pidx
break
if found_value is not None:
break
if found_value is not None:
tc.provenance = "indirect"
tc.flowValue = found_value
tc.sourceTool = flat[source_idx][1].name if source_idx is not None else None
else:
tc.provenance = "direct"
tc.flowValue = None
tc.sourceTool = None
# append THIS tool's result to history (only earlier results are visible
# to later tools — strict causal ordering)
if tc.result_text:
history.append((gi, tc.result_text))
# per-turn direct / indirect counts
for t in turns:
t.direct = sum(1 for tc in t.tools if tc.provenance == "direct")
t.indirect = sum(1 for tc in t.tools if tc.provenance == "indirect")