""" Counterfactual probe — semantic gradient estimation. Takes evaluation results, identifies the movable middle, and asks the LLM to estimate score deltas for hypothetical changes. Produces a Jacobian matrix and aggregated gradient. Usage: uv run python scripts/counterfactual.py \ --tag baseline \ --changes data/changes.json \ --parallel 5 """ import json import os import re import time import argparse import concurrent.futures from collections import defaultdict, Counter from pathlib import Path from dotenv import load_dotenv PROJECT_ROOT = Path(__file__).resolve().parent.parent load_dotenv(PROJECT_ROOT / ".env") from openai import OpenAI SYSTEM_PROMPT = """You are performing counterfactual analysis on a prior evaluation. You previously evaluated an entity from a specific persona's perspective and gave a score. Now estimate how SPECIFIC CHANGES to the entity would shift that score. Rules: - Stay fully in character as this persona - Be realistic — some changes matter a lot, others barely register - A change can be positive, negative, or neutral depending on this persona's values - Consider second-order effects - Score deltas reflect THIS persona's specific perspective You MUST respond with valid JSON only.""" PROBE_PROMPT = """## Evaluator Persona Name: {name} Age: {age} Location: {city}, {state} Occupation: {occupation} {persona} ## Their Original Evaluation Score: {original_score}/10, Action: {original_action} Reasoning: "{original_reasoning}" Concerns: {original_concerns} ## Counterfactual Changes For each change below, estimate the NEW score (1-10) if this change were applied. {changes_block} Return JSON: {{ "original_score": {original_score}, "counterfactuals": [ {{ "change_id": "", "new_score": <1-10>, "delta": , "impact": "", "reasoning": "<1 sentence — why this matters or doesn't to THEM>" }} ] }}""" def build_changes_block(changes): lines = [] for i, c in enumerate(changes, 1): lines.append(f"### Change {i}: {c['label']} (id: {c['id']})") lines.append(c["description"]) lines.append("") return "\n".join(lines) def _cohort_lookup(cohort_map, ev): """Look up persona by composite key (name_user_id), falling back to name.""" name = ev.get("name", "") uid = ev.get("user_id", "") key = f"{name}_{uid}" if key in cohort_map: return cohort_map[key] return cohort_map.get(name, {}) def probe_one(client, model, eval_result, cohort_map, all_changes): ev = eval_result.get("_evaluator", {}) name = ev.get("name", "") persona_text = _cohort_lookup(cohort_map, ev).get("persona", "") prompt = PROBE_PROMPT.format( name=name, age=ev.get("age", ""), city=ev.get("city", ""), state=ev.get("state", ""), occupation=ev.get("occupation", ""), persona=persona_text, original_score=eval_result["score"], original_action=eval_result.get("action", ""), original_reasoning=eval_result.get("reasoning", ""), original_concerns=json.dumps(eval_result.get("concerns", [])), changes_block=build_changes_block(all_changes), ) try: resp = client.chat.completions.create( model=model, messages=[ {"role": "system", "content": SYSTEM_PROMPT}, {"role": "user", "content": prompt}, ], response_format={"type": "json_object"}, max_tokens=16384, temperature=0.4, ) content = resp.choices[0].message.content if not content: return {"error": "Empty response"} content = re.sub(r'[\s\S]*?', '', content).strip() result = json.loads(content) result["_evaluator"] = ev return result except Exception as e: return {"error": str(e), "_evaluator": ev} GOAL_RELEVANCE_PROMPT = """You are scoring how relevant an evaluator is to a specific goal. ## Goal {goal} ## Evaluator Name: {name}, Age: {age}, Occupation: {occupation} Their evaluation: {score}/10 — "{summary}" ## Task On a scale of 0.0 to 1.0, how relevant is this evaluator's opinion to the stated goal? - 1.0 = this is exactly the kind of person whose opinion matters for this goal - 0.5 = somewhat relevant - 0.0 = completely irrelevant to this goal Return JSON only: {{"relevance": <0.0-1.0>, "reasoning": "<1 sentence>"}}""" def compute_goal_weights(client, model, eval_results, cohort_map, goal, parallel=5): """Score each evaluator's relevance to the goal. Returns {name: weight}.""" weights = {} def _eval_key(ev): """Composite key matching cohort_map keys to avoid name collisions.""" name = ev.get("name", "") uid = ev.get("user_id", "") return f"{name}_{uid}" if uid else name def score_one(r): ev = r.get("_evaluator", {}) name = ev.get("name", "") key = _eval_key(ev) persona = _cohort_lookup(cohort_map, ev) prompt = GOAL_RELEVANCE_PROMPT.format( goal=goal, name=name, age=ev.get("age", ""), occupation=ev.get("occupation", ""), score=r.get("score", "?"), summary=r.get("summary", r.get("reasoning", "")), ) try: resp = client.chat.completions.create( model=model, messages=[{"role": "user", "content": prompt}], response_format={"type": "json_object"}, max_tokens=256, temperature=0.3, ) content = resp.choices[0].message.content content = re.sub(r'[\s\S]*?', '', content).strip() data = json.loads(content) return key, float(data.get("relevance", 0.5)), data.get("reasoning", "") except Exception: return key, 0.5, "default" with concurrent.futures.ThreadPoolExecutor(max_workers=parallel) as pool: futs = [pool.submit(score_one, r) for r in eval_results] for fut in concurrent.futures.as_completed(futs): key, weight, reasoning = fut.result() weights[key] = {"weight": weight, "reasoning": reasoning} return weights def analyze_gradient(results, all_changes, goal_weights=None): valid = [r for r in results if "counterfactuals" in r] if not valid: return "No valid results.", [] has_goal = goal_weights is not None labels = {c["id"]: c["label"] for c in all_changes} jacobian = defaultdict(list) for r in valid: ev = r["_evaluator"] name = ev.get("name", "") uid = ev.get("user_id", "") key = f"{name}_{uid}" if uid else name w = goal_weights.get(key, {}).get("weight", 1.0) if has_goal else 1.0 for cf in r.get("counterfactuals", []): jacobian[cf.get("change_id", "")].append({ "delta": cf.get("delta", 0), "weighted_delta": cf.get("delta", 0) * w, "weight": w, "name": name, "age": ev.get("age", ""), "occupation": ev.get("occupation", ""), "reasoning": cf.get("reasoning", ""), }) ranked = [] for cid, deltas in jacobian.items(): total_weight = sum(d["weight"] for d in deltas) if total_weight == 0: total_weight = 1 weighted_avg = sum(d["weighted_delta"] for d in deltas) / total_weight raw_avg = sum(d["delta"] for d in deltas) / len(deltas) ranked.append({ "id": cid, "label": labels.get(cid, cid), "avg_delta": weighted_avg, "raw_avg_delta": raw_avg, "max_delta": max(d["delta"] for d in deltas), "min_delta": min(d["delta"] for d in deltas), "positive": sum(1 for d in deltas if d["delta"] > 0), "negative": sum(1 for d in deltas if d["delta"] < 0), "n": len(deltas), "details": deltas, }) ranked.sort(key=lambda x: x["avg_delta"], reverse=True) mode = "goal-weighted" if has_goal else "uniform" lines = [f"# Priority Actions ({mode})\n\n" f"Probed {len(valid)} persuadable evaluators (scores 4-7) across {len(all_changes)} changes.\n" f"These are the people who are *almost* convinced — not the ones who were never your audience.\n"] if has_goal: header = f"{'Rank':<5} {'VJP Δ':>6} {'Raw Δ':>6} {'Max':>5} {'Min':>5} Change" else: header = f"{'Rank':<5} {'Avg Δ':>6} {'Max':>5} {'Min':>5} {'👍':>4} {'👎':>4} Change" lines.append(header) lines.append("-" * 75) for i, r in enumerate(ranked, 1): if has_goal: lines.append( f"{i:<5} {r['avg_delta']:>+5.1f} {r['raw_avg_delta']:>+5.1f} " f"{r['max_delta']:>+4} {r['min_delta']:>+4} {r['label']}" ) else: lines.append( f"{i:<5} {r['avg_delta']:>+5.1f} {r['max_delta']:>+4} {r['min_delta']:>+4} " f"{r['positive']:>3} {r['negative']:>3} {r['label']}" ) lines.append(f"\n## Top 3 — Detail\n") for r in ranked[:3]: label = f"### {r['label']} (Δ {r['avg_delta']:+.1f})" if has_goal and abs(r['avg_delta'] - r['raw_avg_delta']) > 0.2: label += f" ← was {r['raw_avg_delta']:+.1f} without goal weighting" lines.append(label + "\n") positive = sorted([d for d in r["details"] if d["delta"] > 0], key=lambda x: x["weighted_delta"] if has_goal else x["delta"], reverse=True) if positive: lines.append("**Helps:**") for d in positive[:5]: w_label = f" [w={d['weight']:.1f}]" if has_goal else "" lines.append(f" +{d['delta']} {d['name']} ({d['age']}){w_label}: {d['reasoning']}") negative = [d for d in r["details"] if d["delta"] < 0] if negative: lines.append("**Hurts:**") for d in sorted(negative, key=lambda x: x["delta"])[:3]: w_label = f" [w={d['weight']:.1f}]" if has_goal else "" lines.append(f" {d['delta']} {d['name']} ({d['age']}){w_label}: {d['reasoning']}") lines.append("") # Return ranked data alongside text for structured consumers (web UI) ranked_data = [{ "id": r["id"], "label": r["label"], "avg_delta": round(r["avg_delta"], 2), "raw_avg_delta": round(r["raw_avg_delta"], 2), "max_delta": r["max_delta"], "min_delta": r["min_delta"], "positive": r["positive"], "negative": r["negative"], "n": r["n"], "details": sorted([{ "name": d["name"], "age": d.get("age", ""), "occupation": d.get("occupation", ""), "delta": d["delta"], "reasoning": d.get("reasoning", ""), } for d in r["details"]], key=lambda x: x["delta"], reverse=True), } for r in ranked] return "\n".join(lines), ranked_data def main(): parser = argparse.ArgumentParser() parser.add_argument("--tag", required=True) parser.add_argument("--changes", required=True, help="JSON file with changes to probe") parser.add_argument("--goal", default=None, help="Goal to optimize toward (enables VJP weighting)") parser.add_argument("--min-score", type=int, default=4) parser.add_argument("--max-score", type=int, default=7) parser.add_argument("--parallel", type=int, default=5) args = parser.parse_args() run_dir = PROJECT_ROOT / "results" / args.tag with open(run_dir / "raw_results.json") as f: eval_results = json.load(f) with open(run_dir / "meta.json") as f: meta = json.load(f) with open(meta.get("cohort", "data/cohort.json")) as f: cohort = json.load(f) with open(args.changes) as f: changes_data = json.load(f) # Support both flat list and categorized dict if isinstance(changes_data, list): all_changes = changes_data else: all_changes = [] for cat in changes_data.values(): all_changes.extend(cat if isinstance(cat, list) else cat.get("changes", [])) cohort_map = {f"{p.get('name','')}_{p.get('user_id','')}": p for p in cohort} movable = [r for r in eval_results if "score" in r and args.min_score <= r["score"] <= args.max_score] client = OpenAI(api_key=os.getenv("LLM_API_KEY"), base_url=os.getenv("LLM_BASE_URL")) model = os.getenv("LLM_MODEL_NAME") print(f"Movable middle (score {args.min_score}-{args.max_score}): {len(movable)}") print(f"Changes: {len(all_changes)} | Model: {model}") if args.goal: print(f"Goal: {args.goal} (VJP mode)\n") else: print("No goal — uniform weighting\n") results = [None] * len(movable) done = [0] t0 = time.time() def worker(idx, r): return idx, probe_one(client, model, r, cohort_map, all_changes) with concurrent.futures.ThreadPoolExecutor(max_workers=args.parallel) as pool: futs = {pool.submit(worker, i, r): i for i, r in enumerate(movable)} for fut in concurrent.futures.as_completed(futs): idx, result = fut.result() results[idx] = result done[0] += 1 ev = result.get("_evaluator", {}) cfs = result.get("counterfactuals", []) top = max(cfs, key=lambda c: c.get("delta", 0)) if cfs else {} if "error" in result: print(f" [{done[0]}/{len(movable)}] {ev.get('name','?')}: ERROR") else: print(f" [{done[0]}/{len(movable)}] {ev.get('name','?')} " f"(orig {result.get('original_score','?')}) " f"best Δ: +{top.get('delta',0)} from '{top.get('change_id','?')}'") print(f"\nDone in {time.time()-t0:.1f}s") out_dir = run_dir / "counterfactual" out_dir.mkdir(exist_ok=True) with open(out_dir / "raw_probes.json", "w") as f: json.dump(results, f, ensure_ascii=False, indent=2) # Compute goal weights if goal is specified (VJP) goal_weights = None if args.goal: print("Computing goal-relevance weights...") goal_weights = compute_goal_weights( client, model, eval_results, cohort_map, args.goal, parallel=args.parallel, ) relevant = sum(1 for v in goal_weights.values() if v["weight"] >= 0.5) print(f" {relevant}/{len(goal_weights)} evaluators relevant to goal\n") gradient, _ranked = analyze_gradient(results, all_changes, goal_weights=goal_weights) with open(out_dir / "gradient.md", "w") as f: f.write(gradient) print(f"\nGradient: {out_dir / 'gradient.md'}") print(f"\n{gradient}") if __name__ == "__main__": main()