RFTSystems's picture
Create drp/diff.py
69ba747 verified
raw
history blame
4.73 kB
import difflib
from typing import Any, Dict, List, Optional, Tuple
from .bundle import Bundle, load_bundle
def _normalize_for_compare(x: Any) -> Any:
# Avoid false diffs from ordering
if isinstance(x, dict):
return {k: _normalize_for_compare(x[k]) for k in sorted(x.keys())}
if isinstance(x, list):
return [_normalize_for_compare(v) for v in x]
return x
def _json_diff(a: Any, b: Any, path: str = "") -> List[Dict[str, Any]]:
"""
Small recursive diff (no heavy deps).
Emits list of {path, a, b, kind}.
"""
diffs: List[Dict[str, Any]] = []
if type(a) != type(b):
diffs.append({"path": path or "$", "kind": "type", "a": str(type(a)), "b": str(type(b))})
return diffs
if isinstance(a, dict):
akeys = set(a.keys())
bkeys = set(b.keys())
for k in sorted(akeys - bkeys):
diffs.append({"path": f"{path}.{k}" if path else k, "kind": "removed", "a": a[k], "b": None})
for k in sorted(bkeys - akeys):
diffs.append({"path": f"{path}.{k}" if path else k, "kind": "added", "a": None, "b": b[k]})
for k in sorted(akeys & bkeys):
diffs.extend(_json_diff(a[k], b[k], f"{path}.{k}" if path else k))
return diffs
if isinstance(a, list):
# list diff by index (simple)
n = max(len(a), len(b))
for i in range(n):
pa = a[i] if i < len(a) else None
pb = b[i] if i < len(b) else None
if i >= len(a):
diffs.append({"path": f"{path}[{i}]", "kind": "added", "a": None, "b": pb})
elif i >= len(b):
diffs.append({"path": f"{path}[{i}]", "kind": "removed", "a": pa, "b": None})
else:
diffs.extend(_json_diff(pa, pb, f"{path}[{i}]"))
return diffs
if a != b:
diffs.append({"path": path or "$", "kind": "value", "a": a, "b": b})
return diffs
def _classify_divergence(ev_a: Dict[str, Any], ev_b: Dict[str, Any]) -> str:
ka = ev_a.get("kind")
kb = ev_b.get("kind")
if ka != kb:
return "control-flow"
if ka in ("tool_call", "tool_result"):
return "tool"
if ka in ("memory_write", "memory_read"):
return "memory"
if ka in ("llm_sample", "llm_call"):
return "sampling"
if ka in ("guardrail",):
return "governance"
return "state"
def _text_delta(a: str, b: str) -> str:
a_lines = a.splitlines()
b_lines = b.splitlines()
diff = difflib.unified_diff(a_lines, b_lines, fromfile="A", tofile="B", lineterm="")
return "\n".join(diff)
def diff_bundles(zip_a: str, zip_b: str) -> Dict[str, Any]:
A = load_bundle(zip_a)
B = load_bundle(zip_b)
ea = A.events
eb = B.events
n = min(len(ea), len(eb))
first_div: Optional[int] = None
per_event: List[Dict[str, Any]] = []
for i in range(n):
na = _normalize_for_compare({k: ea[i].get(k) for k in ("kind", "step", "payload")})
nb = _normalize_for_compare({k: eb[i].get(k) for k in ("kind", "step", "payload")})
if na != nb and first_div is None:
first_div = i
if na != nb:
diffs = _json_diff(na, nb)
item = {
"i": i,
"step_a": ea[i].get("step"),
"step_b": eb[i].get("step"),
"kind_a": ea[i].get("kind"),
"kind_b": eb[i].get("kind"),
"class": _classify_divergence(ea[i], eb[i]),
"diffs": diffs[:200], # cap
}
# Optional friendly text diff if payload has 'text'
ta = ea[i].get("payload", {}).get("text")
tb = eb[i].get("payload", {}).get("text")
if isinstance(ta, str) and isinstance(tb, str) and ta != tb:
item["text_unified_diff"] = _text_delta(ta, tb)[:20000]
per_event.append(item)
# length mismatch
if len(ea) != len(eb):
first_div = first_div if first_div is not None else n
summary = {
"run_a": A.manifest.get("run_id"),
"run_b": B.manifest.get("run_id"),
"framework_a": A.manifest.get("framework"),
"framework_b": B.manifest.get("framework"),
"model_a": A.manifest.get("model_id"),
"model_b": B.manifest.get("model_id"),
"events_a": len(ea),
"events_b": len(eb),
"first_divergence_index": first_div,
}
# simple counts by class
counts: Dict[str, int] = {}
for item in per_event:
counts[item["class"]] = counts.get(item["class"], 0) + 1
out = {
"summary": summary,
"class_counts": counts,
"differences": per_event[:400], # cap for UI
}
return out