commitment-os / evaluation /evaluate_llm_checkpoints.py
jayantaggarwal-sketch
Sync latest project updates to Hugging Face Space.
d53a65c
"""Evaluate base vs RL-trained LLM checkpoints on CommitmentOS.
This script runs the SAME protocol for two local-loading model setups:
- baseline model loaded from a Hugging Face model ID
- trained model loaded from a local LoRA adapter path on top of that base model
It writes judge-friendly artifacts under artifacts/evals_llm/.
"""
from __future__ import annotations
import csv
import gc
import json
import os
import sys
import uuid
from pathlib import Path
from statistics import mean, median
from typing import Any
import requests
from dotenv import load_dotenv
from pydantic import ValidationError
PROJECT_ROOT = Path(__file__).resolve().parents[1]
if str(PROJECT_ROOT) not in sys.path:
sys.path.insert(0, str(PROJECT_ROOT))
from models import CommitmentAction
ARTIFACT_DIR = Path("artifacts/evals_llm")
ARTIFACT_DIR.mkdir(parents=True, exist_ok=True)
load_dotenv()
ENV_BASE_URL = os.getenv("ENV_BASE_URL", "https://jayant2304-commitment-os.hf.space")
HF_TOKEN = os.getenv("HF_TOKEN", "").strip() or None
BASELINE_MODEL = os.getenv("BASELINE_MODEL_NAME", "").strip()
TRAINED_MODEL_PATH = os.getenv("TRAINED_MODEL_PATH", "").strip()
EVAL_SEED = int(os.getenv("EVAL_SEED", "42"))
MAX_STEPS = int(os.getenv("EVAL_MAX_STEPS", "12"))
TEMPERATURE = float(os.getenv("EVAL_TEMPERATURE", "0.0"))
TOP_P = float(os.getenv("EVAL_TOP_P", "1.0"))
MAX_NEW_TOKENS = int(os.getenv("EVAL_MAX_NEW_TOKENS", "256"))
SUCCESS_THRESHOLD = float(os.getenv("EVAL_SUCCESS_THRESHOLD", "0.6"))
SYSTEM_PROMPT = """You are an expert executive assistant AI. You manage calendars, emails, and dining reservations.
You will be given a scenario briefing describing a situation with calendar conflicts, emails, or planning tasks.
For each turn, you must respond with EXACTLY ONE JSON object choosing a tool to call:
Available tools:
- {"action_type": "view_calendar", "date": "2026-04-25"}
- {"action_type": "check_availability", "person": "Client_Jones"}
- {"action_type": "search_restaurants", "cuisine": "Italian", "max_price": 50, "dietary": "vegetarian", "max_distance_miles": 3.0, "near_airport": false}
- {"action_type": "schedule_meeting", "title": "Demo", "date": "2026-04-25", "time": "14:00", "duration_min": 60, "participants": ["Client_Jones"], "location": "Room A"}
- {"action_type": "reschedule_event", "event_id": "evt_1", "new_time": "15:00"}
- {"action_type": "cancel_event", "event_id": "evt_1"}
- {"action_type": "send_email", "to": "VP_Chen", "subject": "Meeting update", "body": "Hi, I need to reschedule..."}
- {"action_type": "book_restaurant", "restaurant_name": "Sky Lounge"}
- {"action_type": "submit_plan"}
IMPORTANT RULES:
1. Respond with ONLY a JSON object, no markdown, no explanation
2. Handle higher-priority items before lower-priority ones
3. When cancelling or rescheduling commitments, ALWAYS send an email to affected parties BEFORE submitting
4. Call submit_plan when you have resolved all issues
5. Never silently drop a commitment — always notify the affected person"""
def _require_env() -> None:
if not BASELINE_MODEL:
raise RuntimeError("Set BASELINE_MODEL_NAME")
if not TRAINED_MODEL_PATH:
raise RuntimeError("Set TRAINED_MODEL_PATH")
if not Path(TRAINED_MODEL_PATH).exists():
raise RuntimeError(f"TRAINED_MODEL_PATH does not exist: {TRAINED_MODEL_PATH}")
def _load_runtime_deps() -> tuple[Any, Any, Any, Any]:
try:
import torch
from peft import PeftModel
from transformers import AutoModelForCausalLM, AutoTokenizer
except ImportError as exc:
raise RuntimeError(
"Missing evaluation dependencies. From the repo root: "
'pip install -e ".[llm-eval]"'
" (or: pip install transformers peft accelerate torch sentencepiece)"
) from exc
return torch, AutoModelForCausalLM, AutoTokenizer, PeftModel
def _get_task_ids() -> list[str]:
resp = requests.get(f"{ENV_BASE_URL}/tasks", timeout=30)
resp.raise_for_status()
data = resp.json()
task_ids: list[str] = []
for difficulty in ("easy", "medium", "hard"):
task_ids.extend(data.get(difficulty, []))
return task_ids
def _parse_action(text: str) -> dict[str, Any]:
text = (text or "").strip()
if text.startswith("```"):
lines = text.split("\n")
text = "\n".join(lines[1:-1]) if len(lines) > 2 else lines[0]
try:
action = json.loads(text)
if isinstance(action, dict) and action.get("action_type"):
return action
except json.JSONDecodeError:
pass
return {"action_type": "submit_plan"}
def _normalize_action(action: dict[str, Any]) -> dict[str, Any]:
allowed_fields = set(CommitmentAction.model_fields.keys())
payload = {k: v for k, v in action.items() if k in allowed_fields}
if isinstance(payload.get("participants"), str):
participants = [
item.strip()
for item in payload["participants"].split(",")
if item.strip()
]
payload["participants"] = participants
if "duration_min" in payload:
try:
payload["duration_min"] = int(payload["duration_min"])
except (TypeError, ValueError):
payload.pop("duration_min", None)
if "max_price" in payload:
try:
payload["max_price"] = int(payload["max_price"])
except (TypeError, ValueError):
payload.pop("max_price", None)
if "max_distance_miles" in payload:
try:
payload["max_distance_miles"] = float(payload["max_distance_miles"])
except (TypeError, ValueError):
payload.pop("max_distance_miles", None)
if isinstance(payload.get("near_airport"), str):
payload["near_airport"] = payload["near_airport"].strip().lower() in {"true", "1", "yes"}
try:
return CommitmentAction.model_validate(payload).model_dump()
except ValidationError:
return CommitmentAction(action_type="submit_plan").model_dump()
def _dtype_and_device(torch_mod: Any) -> tuple[Any, str | None]:
if not torch_mod.cuda.is_available():
return torch_mod.float32, None
if torch_mod.cuda.is_bf16_supported():
return torch_mod.bfloat16, "auto"
return torch_mod.float16, "auto"
def _path_has_tokenizer_files(path: Path) -> bool:
tokenizer_files = {
"tokenizer.json",
"tokenizer_config.json",
"special_tokens_map.json",
"vocab.json",
"merges.txt",
"spiece.model",
}
return any((path / file_name).exists() for file_name in tokenizer_files)
class LocalChatModel:
def __init__(
self,
*,
display_name: str,
tokenizer: Any,
model: Any,
torch_mod: Any,
) -> None:
self.display_name = display_name
self.tokenizer = tokenizer
self.model = model
self.torch = torch_mod
def generate_action(self, messages: list[dict[str, str]]) -> tuple[dict[str, Any], str]:
prompt = self.tokenizer.apply_chat_template(
messages,
tokenize=False,
add_generation_prompt=True,
)
inputs = self.tokenizer(prompt, return_tensors="pt")
target_device = next(self.model.parameters()).device
inputs = {k: v.to(target_device) for k, v in inputs.items()}
generation_kwargs: dict[str, Any] = {
"max_new_tokens": MAX_NEW_TOKENS,
"pad_token_id": self.tokenizer.pad_token_id,
"eos_token_id": self.tokenizer.eos_token_id,
}
if TEMPERATURE > 0:
generation_kwargs.update(
{
"do_sample": True,
"temperature": TEMPERATURE,
"top_p": TOP_P,
}
)
else:
generation_kwargs["do_sample"] = False
with self.torch.inference_mode():
output_ids = self.model.generate(**inputs, **generation_kwargs)
prompt_len = inputs["input_ids"].shape[-1]
new_tokens = output_ids[0][prompt_len:]
raw = self.tokenizer.decode(new_tokens, skip_special_tokens=True).strip()
return _normalize_action(_parse_action(raw)), raw
def unload(self) -> None:
del self.model
gc.collect()
if self.torch.cuda.is_available():
self.torch.cuda.empty_cache()
def _load_tokenizer(AutoTokenizer: Any, model_or_path: str | Path) -> Any:
tokenizer = AutoTokenizer.from_pretrained(
model_or_path,
trust_remote_code=True,
token=HF_TOKEN,
)
if tokenizer.pad_token is None:
tokenizer.pad_token = tokenizer.eos_token
return tokenizer
def load_baseline_model() -> LocalChatModel:
torch_mod, AutoModelForCausalLM, AutoTokenizer, _ = _load_runtime_deps()
dtype, device_map = _dtype_and_device(torch_mod)
tokenizer = _load_tokenizer(AutoTokenizer, BASELINE_MODEL)
model = AutoModelForCausalLM.from_pretrained(
BASELINE_MODEL,
trust_remote_code=True,
token=HF_TOKEN,
dtype=dtype,
device_map=device_map,
)
model.eval()
return LocalChatModel(
display_name=BASELINE_MODEL,
tokenizer=tokenizer,
model=model,
torch_mod=torch_mod,
)
def load_trained_model() -> LocalChatModel:
torch_mod, AutoModelForCausalLM, AutoTokenizer, PeftModel = _load_runtime_deps()
dtype, device_map = _dtype_and_device(torch_mod)
adapter_path = Path(TRAINED_MODEL_PATH)
tokenizer_source: str | Path = adapter_path if _path_has_tokenizer_files(adapter_path) else BASELINE_MODEL
tokenizer = _load_tokenizer(AutoTokenizer, tokenizer_source)
base_model = AutoModelForCausalLM.from_pretrained(
BASELINE_MODEL,
trust_remote_code=True,
token=HF_TOKEN,
dtype=dtype,
device_map=device_map,
)
model = PeftModel.from_pretrained(base_model, adapter_path)
model.eval()
return LocalChatModel(
display_name=str(adapter_path),
tokenizer=tokenizer,
model=model,
torch_mod=torch_mod,
)
def _env_reset(task_id: str, episode_id: str) -> dict[str, Any]:
resp = requests.post(
f"{ENV_BASE_URL}/reset",
params={"task_id": task_id, "seed": EVAL_SEED, "episode_id": episode_id},
timeout=30,
)
resp.raise_for_status()
data = resp.json()
return data.get("observation", data)
def _env_step(action: dict[str, Any], episode_id: str) -> dict[str, Any]:
resp = requests.post(
f"{ENV_BASE_URL}/step",
params={"episode_id": episode_id},
json={"action": action},
timeout=30,
)
if resp.status_code >= 400:
raise requests.HTTPError(
f"{resp.status_code} {resp.reason}: {resp.text}",
response=resp,
)
data = resp.json()
obs = data.get("observation", data)
obs["done"] = data.get("done", obs.get("done", False))
obs["reward"] = float(data.get("reward", obs.get("reward", 0.0)) or 0.0)
return obs
def _env_state(episode_id: str) -> dict[str, Any]:
resp = requests.get(f"{ENV_BASE_URL}/state", params={"episode_id": episode_id}, timeout=30)
resp.raise_for_status()
return resp.json()
def run_task(chat_model: LocalChatModel, task_id: str) -> dict[str, Any]:
safe_name = chat_model.display_name.replace("/", "-").replace(" ", "_")
episode_id = f"eval-{safe_name}-{task_id}-{uuid.uuid4().hex[:8]}"
obs = _env_reset(task_id, episode_id)
briefing = obs.get("briefing", "")
calendar = json.dumps(obs.get("calendar_snapshot", []), indent=2)
inbox = json.dumps(obs.get("inbox", []), indent=2)
messages: list[dict[str, str]] = [
{"role": "system", "content": SYSTEM_PROMPT},
{"role": "user", "content": f"SCENARIO: {briefing}\n\nCALENDAR:\n{calendar}\n\nINBOX:\n{inbox}\n\nWhat is your first action?"},
]
trace: list[dict[str, Any]] = []
step_num = 0
done = False
final_obs: dict[str, Any] = obs
for step_num in range(1, MAX_STEPS + 1):
action, raw = chat_model.generate_action(messages)
step_obs = _env_step(action, episode_id)
final_obs = step_obs
done = bool(step_obs.get("done", False))
trace.append(
{
"step": step_num,
"action": action,
"raw_model_output": raw,
"reward": float(step_obs.get("reward", 0.0)),
"done": done,
"tool_result": step_obs.get("tool_result", ""),
}
)
if done:
break
messages.append({"role": "assistant", "content": raw})
messages.append({"role": "user", "content": f"TOOL RESULT: {step_obs.get('tool_result', '')}\n\nWhat is your next action?"})
if not done:
final_obs = _env_step({"action_type": "submit_plan"}, episode_id)
step_num += 1
trace.append(
{
"step": step_num,
"action": {"action_type": "submit_plan"},
"raw_model_output": '{"action_type":"submit_plan"}',
"reward": float(final_obs.get("reward", 0.0)),
"done": True,
"tool_result": final_obs.get("tool_result", ""),
}
)
state = _env_state(episode_id)
final_reward = float(final_obs.get("reward", 0.0))
return {
"task_id": task_id,
"difficulty": final_obs.get("difficulty", ""),
"model_name": chat_model.display_name,
"final_reward": round(final_reward, 4),
"success": final_reward >= SUCCESS_THRESHOLD,
"steps_used": int(state.get("step_count", step_num)),
"violation_count": int(state.get("violation_count", 0)),
"reward_breakdown": final_obs.get("reward_breakdown", {}),
"feedback": final_obs.get("feedback", ""),
"trace": trace,
}
def run_model(chat_model: LocalChatModel, task_ids: list[str]) -> list[dict[str, Any]]:
results: list[dict[str, Any]] = []
n = len(task_ids)
label = chat_model.display_name
for i, task_id in enumerate(task_ids, start=1):
print(f"[eval {label}] task {i}/{n}: {task_id}", flush=True)
results.append(run_task(chat_model, task_id=task_id))
return results
def _write_json(path: Path, payload: Any) -> None:
path.write_text(json.dumps(payload, indent=2))
def write_artifacts(baseline: list[dict[str, Any]], trained: list[dict[str, Any]]) -> None:
by_task = {row["task_id"]: row for row in trained}
comparison_rows: list[dict[str, Any]] = []
for base in baseline:
tr = by_task[base["task_id"]]
comparison_rows.append(
{
"task_id": base["task_id"],
"difficulty": base["difficulty"],
"baseline_reward": base["final_reward"],
"trained_reward": tr["final_reward"],
"reward_delta": round(tr["final_reward"] - base["final_reward"], 4),
"baseline_steps": base["steps_used"],
"trained_steps": tr["steps_used"],
"step_delta": tr["steps_used"] - base["steps_used"],
"baseline_violations": base["violation_count"],
"trained_violations": tr["violation_count"],
"violation_delta": tr["violation_count"] - base["violation_count"],
"baseline_success": int(base["success"]),
"trained_success": int(tr["success"]),
}
)
_write_json(ARTIFACT_DIR / "baseline_llm_eval.json", baseline)
_write_json(ARTIFACT_DIR / "trained_llm_eval.json", trained)
_write_json(
ARTIFACT_DIR / "llm_eval_protocol.json",
{
"task_set": "easy_001..hard_015",
"seed": EVAL_SEED,
"max_steps": MAX_STEPS,
"decode_config": {
"temperature": TEMPERATURE,
"top_p": TOP_P,
"max_new_tokens": MAX_NEW_TOKENS,
},
"env_base_url": ENV_BASE_URL,
"baseline_model_name": BASELINE_MODEL,
"trained_model_path": TRAINED_MODEL_PATH,
"success_threshold": SUCCESS_THRESHOLD,
},
)
with (ARTIFACT_DIR / "llm_comparison.csv").open("w", newline="") as f:
writer = csv.DictWriter(f, fieldnames=list(comparison_rows[0].keys()))
writer.writeheader()
writer.writerows(comparison_rows)
baseline_rewards = [r["baseline_reward"] for r in comparison_rows]
trained_rewards = [r["trained_reward"] for r in comparison_rows]
reward_deltas = [r["reward_delta"] for r in comparison_rows]
baseline_steps = [r["baseline_steps"] for r in comparison_rows]
trained_steps = [r["trained_steps"] for r in comparison_rows]
baseline_violations = [r["baseline_violations"] for r in comparison_rows]
trained_violations = [r["trained_violations"] for r in comparison_rows]
baseline_success = [r["baseline_success"] for r in comparison_rows]
trained_success = [r["trained_success"] for r in comparison_rows]
summary = {
"task_count": len(comparison_rows),
"baseline_mean_reward": round(mean(baseline_rewards), 4),
"trained_mean_reward": round(mean(trained_rewards), 4),
"mean_reward_delta": round(mean(trained_rewards) - mean(baseline_rewards), 4),
"median_reward_delta": round(median(reward_deltas), 4),
"baseline_success_rate": round(mean(baseline_success), 4),
"trained_success_rate": round(mean(trained_success), 4),
"success_rate_delta": round(mean(trained_success) - mean(baseline_success), 4),
"baseline_mean_steps": round(mean(baseline_steps), 4),
"trained_mean_steps": round(mean(trained_steps), 4),
"step_delta": round(mean(trained_steps) - mean(baseline_steps), 4),
"baseline_mean_violations": round(mean(baseline_violations), 4),
"trained_mean_violations": round(mean(trained_violations), 4),
"violation_delta": round(mean(trained_violations) - mean(baseline_violations), 4),
"tasks_with_positive_reward_delta": sum(1 for x in reward_deltas if x > 0),
"tasks_with_no_reward_delta": sum(1 for x in reward_deltas if x == 0),
"per_difficulty": {},
}
for difficulty in ("easy", "medium", "hard"):
subset = [r for r in comparison_rows if r["difficulty"] == difficulty]
if not subset:
continue
summary["per_difficulty"][difficulty] = {
"count": len(subset),
"baseline_mean_reward": round(mean([r["baseline_reward"] for r in subset]), 4),
"trained_mean_reward": round(mean([r["trained_reward"] for r in subset]), 4),
"reward_delta": round(
mean([r["trained_reward"] for r in subset]) - mean([r["baseline_reward"] for r in subset]),
4,
),
"baseline_mean_steps": round(mean([r["baseline_steps"] for r in subset]), 4),
"trained_mean_steps": round(mean([r["trained_steps"] for r in subset]), 4),
"step_delta": round(
mean([r["trained_steps"] for r in subset]) - mean([r["baseline_steps"] for r in subset]),
4,
),
}
_write_json(ARTIFACT_DIR / "llm_summary.json", summary)
target_task = "hard_015"
base_case = next((r for r in baseline if r["task_id"] == target_task), None)
tr_case = next((r for r in trained if r["task_id"] == target_task), None)
if base_case and tr_case:
case_study = f"""# LLM Case Study: {target_task}
## Baseline model ({BASELINE_MODEL})
- Reward: {base_case['final_reward']:.4f}
- Steps: {base_case['steps_used']}
- Violations: {base_case['violation_count']}
- Feedback: {base_case['feedback']}
## Trained model ({TRAINED_MODEL_PATH})
- Reward: {tr_case['final_reward']:.4f}
- Steps: {tr_case['steps_used']}
- Violations: {tr_case['violation_count']}
- Feedback: {tr_case['feedback']}
"""
(ARTIFACT_DIR / "llm_case_study_hard_015.md").write_text(case_study)
def _print_summary() -> None:
summary_path = ARTIFACT_DIR / "llm_summary.json"
summary = json.loads(summary_path.read_text())
print("\nCheckpoint comparison summary")
print(f"Baseline mean reward: {summary['baseline_mean_reward']:.4f}")
print(f"Trained mean reward: {summary['trained_mean_reward']:.4f}")
print(f"Reward delta: {summary['mean_reward_delta']:+.4f}")
print(f"Baseline success: {summary['baseline_success_rate']:.4f}")
print(f"Trained success: {summary['trained_success_rate']:.4f}")
print(f"Success delta: {summary['success_rate_delta']:+.4f}")
def main() -> None:
_require_env()
task_ids = _get_task_ids()
print(f"CommitmentOS LLM eval: {len(task_ids)} tasks, env={ENV_BASE_URL}", flush=True)
print("Loading baseline model…", flush=True)
baseline_model = load_baseline_model()
print("Running baseline…", flush=True)
baseline_results = run_model(baseline_model, task_ids)
baseline_model.unload()
print("Loading trained adapter…", flush=True)
trained_model = load_trained_model()
print("Running trained…", flush=True)
trained_results = run_model(trained_model, task_ids)
trained_model.unload()
write_artifacts(baseline_results, trained_results)
print("Wrote LLM checkpoint artifacts to", ARTIFACT_DIR)
_print_summary()
if __name__ == "__main__":
main()