Spaces:
Sleeping
Sleeping
File size: 7,817 Bytes
4d69237 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 | import os
import json
import sys
# Add project root to path
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from config import CONFIG
from detection.pipeline import DetectionPipeline
from evaluation.benchmark import BenchmarkRunner
def analyze_errors():
print("Running Qualitative Error Analysis on Synthetic Trajectories...", flush=True)
# Use Layer 1 only — no OpenRouter API key needed for error analysis
pipeline = DetectionPipeline(enable_layer2=False, enable_layer3=False)
runner = BenchmarkRunner(detector_fn=pipeline.detect)
runner.load_trajectories()
if not runner.trajectories:
print("Error: No trajectories loaded.")
return
errors = []
# Process steps and find misclassifications
total_trajs = len(runner.trajectories)
for i, traj in enumerate(runner.trajectories):
if (i + 1) % 10 == 0 or i == 0:
print(f"Processing trajectory {i+1}/{total_trajs}...", flush=True)
pipeline.reset_history()
traj_id = traj.get("trajectory_id", f"traj_{i+1:03d}")
for step in traj.get("steps", []):
step_num = step.get("step", 0)
res = pipeline.detect(step)
pred_detected = res.get("hallucination_detected", False)
gt_detected = step.get("ground_truth_label", False)
if pred_detected != gt_detected:
# We have a misclassification!
error_type = "FP" if pred_detected else "FN"
conf = res.get("confidence", 0.0)
signals = res.get("detection_signals", {})
# Rule-based logic to determine dominant signal and why wrong
dominant_signal = "None"
why_wrong = "Unknown"
if error_type == "FP":
# False Positive: predicted hallucination but actually clean
sem_sim = signals.get("semantic_similarity")
nli = signals.get("nli_score")
tcm = signals.get("tool_claim_match")
contra = signals.get("contradiction_with_prev")
# Determine dominant signal
max_signal_val = -1
if sem_sim is not None and (1.0 - sem_sim) > max_signal_val:
max_signal_val = 1.0 - sem_sim
dominant_signal = f"Low Semantic Similarity ({sem_sim:.2f})"
if nli is not None and nli > max_signal_val:
max_signal_val = nli
dominant_signal = f"High NLI Contradiction ({nli:.2f})"
if tcm is False and 1.0 > max_signal_val:
max_signal_val = 1.0
dominant_signal = "Tool Claim Mismatch"
if contra is True and 1.0 > max_signal_val:
max_signal_val = 1.0
dominant_signal = "Contradiction with Previous Steps"
# Explanation
if nli is not None and nli > 0.75:
why_wrong = "NLI model flagged benign semantic mismatch as factual contradiction."
elif sem_sim is not None and sem_sim < 0.65:
why_wrong = "Semantic similarity checker flagged synonym-rich correct reasoning as semantic drift."
elif tcm is False:
why_wrong = "Tool validator incorrectly identified formatting differences as a claim mismatch."
else:
why_wrong = "Aggressive signal fusion threshold triggered a false alarm."
else:
# False Negative: predicted clean but actually hallucinated
# Explain why the detector missed it
sem_sim = signals.get("semantic_similarity")
nli = signals.get("nli_score")
dominant_signal = "None"
if sem_sim is not None and sem_sim > 0.8:
dominant_signal = f"High Semantic Similarity ({sem_sim:.2f})"
elif nli is not None and nli < 0.3:
dominant_signal = f"Low NLI Contradiction ({nli:.2f})"
why_wrong = "The hallucination was linguistically subtle or used matching terminology, bypassing the SLM ensemble."
errors.append({
"trajectory_id": traj_id,
"step": step_num,
"error_type": error_type,
"action": step.get("action", ""),
"agent_reasoning": step.get("agent_reasoning", ""),
"tool_output": step.get("tool_output", ""),
"ground_truth_label": gt_detected,
"confidence": conf,
"error_magnitude": abs(conf - (1 if gt_detected else 0)),
"dominant_signal": dominant_signal,
"why_wrong": why_wrong
})
# Sort errors by magnitude (highest confidence wrong predictions first)
errors.sort(key=lambda x: x["error_magnitude"], reverse=True)
# Print top 10 errors
print("\n--- Top 10 Highest-Confidence Wrong Predictions ---")
for idx, err in enumerate(errors[:10]):
print(f"{idx+1}. Traj: {err['trajectory_id']}, Step: {err['step']}, Type: {err['error_type']}, "
f"Conf: {err['confidence']:.4f}, Magnitude: {err['error_magnitude']:.4f}")
print(f" Reasoning: {err['agent_reasoning'][:120]}...")
print(f" Dominant Signal: {err['dominant_signal']}")
print(f" Why Wrong: {err['why_wrong']}")
print()
# Select 6 representative cases (3 FPs, 3 FNs)
fps = [e for e in errors if e["error_type"] == "FP"]
fns = [e for e in errors if e["error_type"] == "FN"]
representative_cases = fps[:3] + fns[:3]
# Generate LaTeX table
tex = r"""\begin{table*}[t]
\centering
\caption{Qualitative Error Analysis: Representative False Positives and False Negatives}
\begin{tabular}{lp{6cm}lp{3cm}p{5cm}}
\toprule
ID & Step Reasoning & Type & Dominant Signal & Error Cause (Rule-Based) \\
\midrule
"""
for err in representative_cases:
clean_reasoning = err["agent_reasoning"].replace("%", r"\%").replace("_", r"\_").replace("&", r"\&")
if len(clean_reasoning) > 100:
clean_reasoning = clean_reasoning[:97] + "..."
row_id = err['trajectory_id'] + r"\_" + str(err['step'])
row_signal = err['dominant_signal'].replace("_", r"\_")
row_why = err['why_wrong'].replace("_", r"\_")
tex += f"{row_id} & {clean_reasoning} & {err['error_type']} & {row_signal} & {row_why} \\\\\n"
tex += r"""\bottomrule
\end{tabular}
\end{table*}"""
# Ensure output directories exist
paper_dir = os.path.join(CONFIG.paths.project_root, "paper", "figures")
os.makedirs(paper_dir, exist_ok=True)
tex_path = os.path.join(paper_dir, "error_analysis_table.tex")
with open(tex_path, "w", encoding="utf-8") as f:
f.write(tex)
print(f"LaTeX error table saved to: {tex_path}")
# Save JSON results
out_dir = os.path.join(CONFIG.paths.project_root, "evaluation", "results")
os.makedirs(out_dir, exist_ok=True)
json_path = os.path.join(out_dir, "error_analysis.json")
with open(json_path, "w", encoding="utf-8") as f:
json.dump(errors, f, indent=2)
print(f"Error analysis JSON saved to: {json_path}")
if __name__ == "__main__":
analyze_errors()
|