satomitheito's picture
Add new agents and observability, fix sys.path for HF Space
3487f22
"""
Braintrust Evaluations for the Contract Analysis Pipeline.
Evaluates the full LangGraph pipeline (ingestion → classification → risk_analysis →
benchmark → report) using:
- Braintrust Eval() framework
- Autoevals LLM-as-judge scorers (Factuality, ClosedQA) via Claude Sonnet 4.6
- Custom heuristic scorers for each agent stage's output validity
Usage:
python agent-evaluation/eval.py
python agent-evaluation/eval.py --dataset eval_dataset.json --output eval_metrics.json
python agent-evaluation/eval.py --no-send-logs
python agent-evaluation/eval.py --debug
"""
import argparse
import json
import logging
import os
import sys
import time
from pathlib import Path
from typing import Any, Optional
# Allow imports from the project root (agents/, etc.)
_PROJECT_ROOT = Path(__file__).resolve().parent.parent
if str(_PROJECT_ROOT) not in sys.path:
sys.path.insert(0, str(_PROJECT_ROOT))
import braintrust
from autoevals.llm import Factuality
from braintrust import Eval
from dotenv import load_dotenv
from openai import OpenAI
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s,p%(process)s,{%(filename)s:%(lineno)d},%(levelname)s,%(message)s",
)
logger = logging.getLogger(__name__)
os.chdir(_PROJECT_ROOT)
load_dotenv()
_EVAL_DIR = Path(__file__).resolve().parent
DEFAULT_DATASET_PATH = str(_EVAL_DIR / "eval_dataset.json")
DEFAULT_OUTPUT_PATH = str(_EVAL_DIR / "eval_metrics.json")
BRAINTRUST_PROJECT_NAME = os.environ.get("BRAINTRUST_PROJECT", "contract-pipeline-evals")
# Side-channel cache populated by the wrapped scorer so _export_eval_metrics
# can persist expected/found/matched type breakdowns to eval_metrics.json.
_clause_type_metadata: dict[str, dict] = {}
EVAL_JUDGE_MODEL = "claude-sonnet-4-6"
ANTHROPIC_OPENAI_BASE_URL = "https://api.anthropic.com/v1/"
braintrust.auto_instrument()
braintrust.init_logger(
api_key=os.environ.get("BRAINTRUST_API_KEY"),
project=BRAINTRUST_PROJECT_NAME,
)
def _create_judge_client() -> OpenAI:
"""
Create an OpenAI-compatible client pointing at Anthropic's API.
Autoevals scorers use the OpenAI SDK interface; Anthropic's compatible
endpoint lets us use Claude Sonnet 4.6 as the judge model.
"""
api_key = os.getenv("ANTHROPIC_API_KEY")
if not api_key:
raise ValueError("ANTHROPIC_API_KEY not set in environment")
return OpenAI(api_key=api_key, base_url=ANTHROPIC_OPENAI_BASE_URL)
def _load_dataset(dataset_path: str) -> list[dict]:
path = Path(dataset_path)
if not path.exists():
raise FileNotFoundError(f"Dataset not found: {dataset_path}")
with open(path) as f:
dataset = json.load(f)
logger.info(f"Loaded {len(dataset)} test cases from {dataset_path}")
return dataset
def _run_pipeline_on_input(contract_text: str) -> dict:
"""
Run the full LangGraph pipeline on a contract and return all intermediate
state plus timing information.
"""
from agents.orchestrator_agent import run_pipeline
logger.info(f"Running pipeline on contract ({len(contract_text)} chars)...")
start = time.time()
try:
result = run_pipeline(contract_text)
elapsed = time.time() - start
report = json.loads(result["report"])
except Exception as exc:
elapsed = time.time() - start
logger.error(f"Pipeline failed: {exc}")
return {
"pipeline_error": str(exc),
"report": None,
"clauses": [],
"classified_clauses": [],
"risk_scores": [],
"benchmark_results": [],
"latency_seconds": elapsed,
}
return {
"pipeline_error": None,
"report": report,
"clauses": result.get("clauses", []),
"classified_clauses": result.get("classified_clauses", []),
"risk_scores": result.get("risk_scores", []),
"benchmark_results": result.get("benchmark_results", []),
"latency_seconds": elapsed,
}
# ---------------------------------------------------------------------------
# Custom Scorers
# ---------------------------------------------------------------------------
def clause_structure_validity_scorer(
input: str,
output: str,
expected: Optional[str] = None,
metadata: Optional[dict] = None,
) -> Optional[dict]:
"""
Ingestion agent: all parsed clauses must have non-empty id, text, and section.
"""
if not metadata:
return None
clauses = metadata.get("clauses", [])
if not clauses:
return {
"name": "ClauseStructureValidity",
"score": 0.0,
"metadata": {"reason": "no clauses produced by ingestion agent"},
}
required = {"id", "text", "section"}
valid = sum(
1 for c in clauses
if required.issubset(c.keys()) and str(c.get("text", "")).strip()
)
score = valid / len(clauses)
return {
"name": "ClauseStructureValidity",
"score": score,
"metadata": {"total_clauses": len(clauses), "valid_clauses": valid},
}
def classification_validity_scorer(
input: str,
output: str,
expected: Optional[str] = None,
metadata: Optional[dict] = None,
) -> Optional[dict]:
"""
Classification agent: each clause must have a non-empty clause_type and a
confidence value in [0, 1].
"""
if not metadata:
return None
classified = metadata.get("classified_clauses", [])
if not classified:
return {
"name": "ClassificationValidity",
"score": 0.0,
"metadata": {"reason": "no classified clauses"},
}
valid = sum(
1 for c in classified
if c.get("clause_type", "") and 0.0 <= c.get("confidence", -1) <= 1.0
)
score = valid / len(classified)
return {
"name": "ClassificationValidity",
"score": score,
"metadata": {"total": len(classified), "valid": valid},
}
def expected_clause_type_scorer(
input: str,
output: str,
expected: Optional[str] = None,
metadata: Optional[dict] = None,
) -> Optional[dict]:
"""
Classification agent: fraction of expected CUAD clause types found in
the classified output. Penalises missing types; does not penalise extras.
"""
if not metadata:
return None
expected_types = metadata.get("expected_clause_types", [])
classified = metadata.get("classified_clauses", [])
if not expected_types or not classified:
return None
found = {c.get("clause_type", "") for c in classified}
expected_set = set(expected_types)
matched = expected_set & found
score = len(matched) / len(expected_set)
return {
"name": "ExpectedClauseType",
"score": score,
"metadata": {
"expected_types": sorted(expected_set),
"found_types": sorted(found),
"matched_types": sorted(matched),
},
}
def _expected_clause_type_scorer_with_cache(
input: str,
output: str,
expected: Optional[str] = None,
metadata: Optional[dict] = None,
) -> Optional[dict]:
result = expected_clause_type_scorer(input, output, expected, metadata)
if result and result.get("metadata"):
_clause_type_metadata[input] = result["metadata"]
return result
def risk_score_validity_scorer(
input: str,
output: str,
expected: Optional[str] = None,
metadata: Optional[dict] = None,
) -> Optional[dict]:
"""
Risk analysis agent: risk_score must be in [0, 1] and risk_factors must be a list.
"""
if not metadata:
return None
risk_scores = metadata.get("risk_scores", [])
if not risk_scores:
return {
"name": "RiskScoreValidity",
"score": 0.0,
"metadata": {"reason": "no risk scores produced"},
}
valid = sum(
1 for c in risk_scores
if 0.0 <= c.get("risk_score", -1) <= 1.0
and isinstance(c.get("risk_factors"), list)
)
score = valid / len(risk_scores)
return {
"name": "RiskScoreValidity",
"score": score,
"metadata": {"total": len(risk_scores), "valid": valid},
}
def risk_factors_presence_scorer(
input: str,
output: str,
expected: Optional[str] = None,
metadata: Optional[dict] = None,
) -> Optional[dict]:
"""
Risk analysis agent: high-risk clauses (score >= 0.5) should have at least one
identified risk factor, not an empty list.
"""
if not metadata:
return None
risk_scores = metadata.get("risk_scores", [])
high_risk = [c for c in risk_scores if c.get("risk_score", 0) >= 0.5]
if not high_risk:
return None
with_factors = sum(1 for c in high_risk if c.get("risk_factors", []))
score = with_factors / len(high_risk)
return {
"name": "RiskFactorsPresence",
"score": score,
"metadata": {"high_risk_clauses": len(high_risk), "with_factors": with_factors},
}
def benchmark_similarity_validity_scorer(
input: str,
output: str,
expected: Optional[str] = None,
metadata: Optional[dict] = None,
) -> Optional[dict]:
"""
Benchmark agent: benchmark_similarity must be in [0, 1] and a source must be
provided for every clause.
"""
if not metadata:
return None
benchmark = metadata.get("benchmark_results", [])
if not benchmark:
return {
"name": "BenchmarkSimilarityValidity",
"score": 0.0,
"metadata": {"reason": "no benchmark results produced"},
}
valid = sum(
1 for c in benchmark
if 0.0 <= c.get("benchmark_similarity", -1) <= 1.0
and c.get("benchmark_source", "")
)
score = valid / len(benchmark)
return {
"name": "BenchmarkSimilarityValidity",
"score": score,
"metadata": {"total": len(benchmark), "valid": valid},
}
def output_structure_validity_scorer(
input: str,
output: str,
expected: Optional[str] = None,
metadata: Optional[dict] = None,
) -> Optional[dict]:
"""
Report node: the final JSON report must contain a summary with total_clauses,
a non-empty clauses list, and each clause must have the required keys.
"""
if not metadata:
return None
report = metadata.get("report")
if report is None:
return {
"name": "OutputStructureValidity",
"score": 0.0,
"metadata": {"reason": "no report produced"},
}
checks = {
"has_summary": "summary" in report,
"has_clauses_key": "clauses" in report,
"has_total_clauses": "total_clauses" in report.get("summary", {}),
"clauses_is_list": isinstance(report.get("clauses"), list),
"clauses_not_empty": len(report.get("clauses", [])) > 0,
}
required_clause_keys = {"id", "clause_type", "risk_score", "benchmark_similarity"}
checks["clause_keys_valid"] = all(
required_clause_keys.issubset(c.keys())
for c in report.get("clauses", [{}])
)
score = sum(checks.values()) / len(checks)
return {
"name": "OutputStructureValidity",
"score": score,
"metadata": checks,
}
def latency_scorer(
input: str,
output: str,
expected: Optional[str] = None,
metadata: Optional[dict] = None,
) -> Optional[dict]:
"""
Pipeline latency scorer. Thresholds are tuned for a multi-agent pipeline
that makes ~3 sequential LLM calls per clause.
< 30s → 1.0 (fast)
30–60s → 0.75
60–120s→ 0.5
120–180s→ 0.25
> 180s → 0.0 (too slow)
"""
if not metadata:
return None
latency = metadata.get("latency_seconds")
if latency is None:
return None
if latency < 30:
score = 1.0
elif latency < 60:
score = 0.75
elif latency < 120:
score = 0.5
elif latency < 180:
score = 0.25
else:
score = 0.0
return {
"name": "Latency",
"score": score,
"metadata": {"latency_seconds": round(latency, 2)},
}
def no_error_scorer(
input: str,
output: str,
expected: Optional[str] = None,
metadata: Optional[dict] = None,
) -> Optional[dict]:
"""
Pipeline-level: score 1.0 if the pipeline completed without raising an
exception and produced a non-None report.
"""
if not metadata:
return None
error = metadata.get("pipeline_error")
if error:
return {
"name": "NoError",
"score": 0.0,
"metadata": {"error": str(error)[:200]},
}
if metadata.get("report") is None:
return {
"name": "NoError",
"score": 0.0,
"metadata": {"reason": "no report produced"},
}
return {"name": "NoError", "score": 1.0, "metadata": {}}
# ---------------------------------------------------------------------------
# Task Function and Data Loader
# ---------------------------------------------------------------------------
def _create_wrapped_task(dataset: list[dict]):
"""
Run the pipeline inside data() and cache results so that runtime metadata
(per-agent outputs, latency) is available to scorers via the metadata dict.
This mirrors the lab10 pattern: Braintrust passes metadata from data() to
every scorer, but pipeline outputs are only known at runtime — so we run
the pipeline here and inject the results into metadata.
"""
results_cache: dict[str, dict] = {}
def data() -> list[dict]:
cases = []
for case in dataset:
contract_text = case["input"]
category = case.get("category", "unknown")
logger.info(f"Running pipeline for test case: [{category}]")
result = _run_pipeline_on_input(contract_text)
results_cache[contract_text] = result
cases.append({
"input": contract_text,
"expected": case.get("expected_output", ""),
"metadata": {
"category": case.get("category", ""),
"difficulty": case.get("difficulty", ""),
"expected_clause_types": case.get("expected_clause_types", []),
# Runtime outputs injected for custom scorers
"clauses": result.get("clauses", []),
"classified_clauses": result.get("classified_clauses", []),
"risk_scores": result.get("risk_scores", []),
"benchmark_results": result.get("benchmark_results", []),
"report": result.get("report"),
"pipeline_error": result.get("pipeline_error"),
"latency_seconds": result.get("latency_seconds"),
},
})
return cases
def task(input: str) -> str:
# Pipeline already ran in data(); return the cached report JSON.
if input in results_cache:
report = results_cache[input].get("report")
return json.dumps(report, indent=2) if report else "Pipeline failed — no report produced"
# Fallback if cache miss (should not happen in normal eval flow)
result = _run_pipeline_on_input(input)
report = result.get("report")
return json.dumps(report, indent=2) if report else "Pipeline failed — no report produced"
return task, data
# ---------------------------------------------------------------------------
# Reporting
# ---------------------------------------------------------------------------
def _print_eval_summary(eval_result: Any, dataset: list[dict]) -> None:
results = eval_result.results
if not results:
logger.warning("No evaluation results to summarize")
return
category_lookup = {case["input"]: case.get("category", "unknown") for case in dataset}
scorer_scores: dict[str, list[float]] = {}
category_scores: dict[str, list[float]] = {}
error_cases = []
for r in results:
input_text = str(r.input) if r.input else ""
category = category_lookup.get(input_text, "unknown")
if r.error:
error_cases.append({"input": input_text[:80], "error": str(r.error)})
continue
for scorer_name, score_val in r.scores.items():
if score_val is None:
continue
scorer_scores.setdefault(scorer_name, []).append(score_val)
category_scores.setdefault(f"{category}/{scorer_name}", []).append(score_val)
print("\n" + "=" * 80)
print("EVALUATION SUMMARY — CONTRACT ANALYSIS PIPELINE")
print("=" * 80)
print(f"Total test cases : {len(results)}")
print(f"Errors : {len(error_cases)}")
print()
print("-" * 80)
print(f"{'Scorer':<35} {'Avg':>10} {'Min':>8} {'Max':>8} {'N':>6}")
print("-" * 80)
for name in sorted(scorer_scores):
scores = scorer_scores[name]
avg = sum(scores) / len(scores)
print(f"{name:<35} {avg:>10.2%} {min(scores):>8.2f} {max(scores):>8.2f} {len(scores):>6}")
print()
print("-" * 80)
print("PER-CATEGORY BREAKDOWN")
print("-" * 80)
categories = sorted({case.get("category", "unknown") for case in dataset})
for cat in categories:
print(f"\n [{cat}]")
for name in sorted(scorer_scores):
key = f"{cat}/{name}"
if key in category_scores:
scores = category_scores[key]
print(f" {name:<33} {sum(scores)/len(scores):>8.2%} (n={len(scores)})")
if error_cases:
print()
print("-" * 80)
print("FAILED CASES")
print("-" * 80)
for case in error_cases:
print(f" Input: {case['input']}")
print(f" Error: {case['error']}")
print()
print("=" * 80 + "\n")
logger.info(
f"Eval summary: {len(results)} cases, {len(error_cases)} errors — "
+ ", ".join(
f"{k}={sum(v)/len(v):.2%}"
for k, v in sorted(scorer_scores.items())
)
)
def _export_eval_metrics(
eval_result: Any,
dataset: list[dict],
output_path: str = DEFAULT_OUTPUT_PATH,
) -> None:
results = eval_result.results
if not results:
logger.warning("No results to export")
return
category_lookup = {case["input"]: case.get("category", "unknown") for case in dataset}
scorer_scores: dict[str, list[float]] = {}
category_scores: dict[str, list[float]] = {}
per_case_results = []
error_count = 0
for r in results:
input_text = str(r.input) if r.input else ""
category = category_lookup.get(input_text, "unknown")
clause_type_meta = _clause_type_metadata.get(input_text)
case_entry: dict = {
"input_preview": input_text[:120],
"category": category,
"scores": {},
"scorer_metadata": {
"ExpectedClauseType": clause_type_meta,
} if clause_type_meta else {},
"error": None,
}
if r.error:
error_count += 1
case_entry["error"] = str(r.error)
per_case_results.append(case_entry)
continue
for scorer_name, score_val in r.scores.items():
if score_val is None:
continue
case_entry["scores"][scorer_name] = round(score_val, 4)
scorer_scores.setdefault(scorer_name, []).append(score_val)
category_scores.setdefault(f"{category}/{scorer_name}", []).append(score_val)
per_case_results.append(case_entry)
overall = {
name: {
"average": round(sum(scores) / len(scores), 4),
"min": round(min(scores), 4),
"max": round(max(scores), 4),
"count": len(scores),
}
for name, scores in sorted(scorer_scores.items())
}
categories = sorted({case.get("category", "unknown") for case in dataset})
per_category: dict[str, dict] = {}
for cat in categories:
per_category[cat] = {}
for name in sorted(scorer_scores):
key = f"{cat}/{name}"
if key in category_scores:
scores = category_scores[key]
per_category[cat][name] = {
"average": round(sum(scores) / len(scores), 4),
"count": len(scores),
}
metrics = {
"total_cases": len(results),
"errors": error_count,
"overall_scores": overall,
"per_category": per_category,
"per_case": per_case_results,
}
with open(output_path, "w") as f:
json.dump(metrics, f, indent=2, default=str)
logger.info(f"Evaluation metrics exported to {output_path}")
# ---------------------------------------------------------------------------
# CLI
# ---------------------------------------------------------------------------
def _parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(
description="Run Braintrust evaluations on the Contract Analysis Pipeline",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Example usage:
python agent-evaluation/eval.py
python agent-evaluation/eval.py --dataset eval_dataset.json --output eval_metrics.json
python agent-evaluation/eval.py --no-send-logs
python agent-evaluation/eval.py --debug
""",
)
parser.add_argument(
"--dataset", type=str, default=DEFAULT_DATASET_PATH,
help=f"Path to evaluation dataset JSON (default: {DEFAULT_DATASET_PATH})",
)
parser.add_argument(
"--output", type=str, default=DEFAULT_OUTPUT_PATH,
help=f"Path for output eval metrics JSON (default: {DEFAULT_OUTPUT_PATH})",
)
parser.add_argument(
"--no-send-logs", action="store_true",
help="Run evaluations locally without sending results to Braintrust",
)
parser.add_argument(
"--experiment-name", type=str, default=None,
help="Name for this evaluation experiment (default: auto-generated)",
)
parser.add_argument(
"--debug", action="store_true",
help="Enable debug logging",
)
return parser.parse_args()
def main() -> None:
args = _parse_args()
if args.debug:
logging.getLogger().setLevel(logging.DEBUG)
logger.info("Starting Contract Analysis Pipeline Evaluations")
start = time.time()
dataset = _load_dataset(args.dataset)
task_fn, data_fn = _create_wrapped_task(dataset)
# Use Claude Sonnet 4.6 as the judge model via Anthropic's OpenAI-compatible endpoint
judge_client = _create_judge_client()
# Scorers
# LLM-as-judge (Factuality, ClosedQA) — assess overall report quality vs expected
# Heuristic per-agent scorers — validate structure and field validity at each stage
all_scorers = [
Factuality(model=EVAL_JUDGE_MODEL, client=judge_client),
clause_structure_validity_scorer, # ingestion agent
classification_validity_scorer, # classification agent
_expected_clause_type_scorer_with_cache, # classification agent
risk_score_validity_scorer, # risk analysis agent
risk_factors_presence_scorer, # risk analysis agent
benchmark_similarity_validity_scorer, # benchmark agent
output_structure_validity_scorer, # report node
latency_scorer, # pipeline-level
no_error_scorer, # pipeline-level
]
eval_kwargs: dict[str, Any] = {
"data": data_fn,
"task": task_fn,
"scores": all_scorers,
}
if args.experiment_name:
eval_kwargs["experiment_name"] = args.experiment_name
if args.no_send_logs:
eval_kwargs["no_send_logs"] = True
logger.info("Running in local mode (no logs sent to Braintrust)")
logger.info("Running Braintrust evaluation...")
eval_result = Eval(BRAINTRUST_PROJECT_NAME, **eval_kwargs)
_print_eval_summary(eval_result, dataset)
_export_eval_metrics(eval_result, dataset, output_path=args.output)
elapsed = time.time() - start
minutes = int(elapsed // 60)
seconds = elapsed % 60
if minutes > 0:
logger.info(f"Evaluation completed in {minutes}m {seconds:.1f}s")
else:
logger.info(f"Evaluation completed in {seconds:.1f}s")
if __name__ == "__main__":
main()