Earning_lens / evaluate.py
RudrakshNanavaty's picture
fix pyproject.toml
e829476
"""
Evaluate inference over N random episodes (default 100).
Requires OpenEnv server and OPENAI_API_KEY. The server's active task (``EARNINGS_ANALYST_TASK_ID``)
must match what you are measuring; ``--task`` here selects which task spec is used for report labels.
Usage:
uv run python evaluate.py
uv run python evaluate.py --samples 50 --quiet
uv run python evaluate.py --samples 10 -o results.csv
"""
from __future__ import annotations
import argparse
import asyncio
import csv
import os
import sys
from collections import defaultdict
from dotenv import load_dotenv
from earnings_analyst.environment_config import DEFAULT_TASK, TASKS
from inference import run_episode
load_dotenv()
def _normalize_label(label_text: str) -> str:
return str(label_text).strip().lower()
def exact_match(predicted_label: str, ground_truth_label: str) -> bool:
return _normalize_label(predicted_label) == _normalize_label(ground_truth_label)
def confusion_key(predicted_label: str, ground_truth_label: str) -> tuple[str, str]:
return (
_normalize_label(predicted_label),
_normalize_label(ground_truth_label),
)
_CSV_FIELDNAMES = (
"sample_index",
"task_id",
"model",
"predicted",
"ground_truth",
"exact_match",
"reward",
"done",
"model_response",
)
async def run_evaluation(
*,
samples: int,
base_url: str | None,
model: str | None,
task_id: str,
quiet: bool,
output_path: str | None,
) -> None:
spec = TASKS.get(task_id) or TASKS[DEFAULT_TASK]
label_values = list(spec["label_values"])
resolved_model = model or os.environ.get("OPENAI_MODEL", "gpt-4o")
rewards: list[float] = []
exact_match_count = 0
confusion: dict[tuple[str, str], int] = defaultdict(int)
per_ground_truth_label: dict[str, dict[str, int]] = defaultdict(
lambda: {"n": 0, "correct": 0}
)
csv_rows: list[dict[str, str | int | float | bool]] = []
for episode_index in range(samples):
if not quiet:
print(f"episode {episode_index + 1}/{samples} ...", flush=True)
episode_result = await run_episode(
base_url=base_url,
model=model,
verbose=True,
)
episode_reward = float(
episode_result.reward if episode_result.reward is not None else 0.0
)
rewards.append(episode_reward)
ground_truth_label = episode_result.ground_truth
predicted_label = episode_result.predicted
is_exact = exact_match(predicted_label, ground_truth_label)
if is_exact:
exact_match_count += 1
confusion[confusion_key(predicted_label, ground_truth_label)] += 1
normalized_ground_truth = _normalize_label(ground_truth_label)
per_ground_truth_label[normalized_ground_truth]["n"] += 1
if is_exact:
per_ground_truth_label[normalized_ground_truth]["correct"] += 1
# --- VERBOSE PRINTING BLOCK (Safe to remove) ---
# NOTE: This block is exclusively for result visibility in the console.
if not quiet:
print(f"\n--- Episode {episode_index + 1}/{samples} Summary ---")
print(f"Reward: {episode_reward:.4f}")
print(f"Predicted: {predicted_label}")
print(f"Ground Truth: {ground_truth_label}")
print(f"Model Response: {episode_result.model_response_text}")
print("-" * 40)
# ------------------------------------------------
csv_rows.append(
{
"sample_index": episode_index + 1,
"task_id": task_id,
"model": resolved_model,
"predicted": predicted_label,
"ground_truth": ground_truth_label,
"exact_match": is_exact,
"reward": episode_reward,
"done": episode_result.done,
"model_response": episode_result.model_response_text or "",
}
)
mean_reward = sum(rewards) / len(rewards) if rewards else 0.0
exact_accuracy = exact_match_count / samples if samples else 0.0
print("\n=== Evaluation summary ===")
print(f"samples: {samples}")
print(f"mean_reward: {mean_reward:.4f}")
print(f"exact_accuracy: {exact_accuracy:.4f} ({exact_match_count}/{samples})")
if label_values:
print("\nPer ground-truth label (exact match rate):")
for lab in label_values:
normalized_key = _normalize_label(lab)
row = per_ground_truth_label.get(normalized_key, {"n": 0, "correct": 0})
total_count, correct_count = row["n"], row["correct"]
rate = (correct_count / total_count) if total_count else 0.0
print(f" {lab!r}: {rate:.4f} ({correct_count}/{total_count})")
else:
print(
"\n(No label_values in selected task spec — add them when the task is implemented.)"
)
print("\nConfusion (predicted -> counts by ground_truth):")
counts_by_truth: dict[str, list[tuple[str, int]]] = defaultdict(list)
for (
predicted_normalized,
truth_normalized,
), occurrence_count in sorted(confusion.items()):
counts_by_truth[truth_normalized].append(
(predicted_normalized, occurrence_count)
)
for truth_normalized in sorted(counts_by_truth.keys()):
parts = ", ".join(
f"{predicted_normalized!r}:{occurrence_count}"
for predicted_normalized, occurrence_count in sorted(
counts_by_truth[truth_normalized]
)
)
print(f" truth={truth_normalized!r}: {parts}")
if output_path:
with open(output_path, "w", newline="", encoding="utf-8") as f:
writer = csv.DictWriter(f, fieldnames=_CSV_FIELDNAMES)
writer.writeheader()
writer.writerows(csv_rows)
print(f"\nWrote {len(csv_rows)} row(s) to {output_path}")
def main() -> None:
parser = argparse.ArgumentParser(
description="Evaluate over N episodes (align server EARNINGS_ANALYST_TASK_ID with --task)"
)
parser.add_argument("--samples", type=int, default=100, help="Number of episodes")
parser.add_argument(
"--base-url",
default=os.environ.get("ENV_SERVER_URL", "http://localhost:8000"),
)
parser.add_argument("--model", default=os.environ.get("OPENAI_MODEL", "gpt-4o"))
parser.add_argument(
"--task",
default=DEFAULT_TASK,
help="Task id for label list in report (must match server task)",
)
parser.add_argument(
"--quiet", action="store_true", help="Suppress per-episode lines"
)
parser.add_argument(
"-o",
"--output",
default=None,
metavar="PATH",
help="Write one row per episode to this CSV file (UTF-8)",
)
args = parser.parse_args()
try:
asyncio.run(
run_evaluation(
samples=args.samples,
base_url=args.base_url,
model=args.model,
task_id=args.task,
quiet=args.quiet,
output_path=args.output,
)
)
except Exception as e:
print(f"error: {e}", file=sys.stderr)
sys.exit(1)
if __name__ == "__main__":
main()