llm-agent-factory / experiments /run_experiments.py
bridges-optimal-55's picture
Initial commit
505aa09
"""
Main experiment runner.
Orchestrates all experiments:
1. Retrieval-based agent generation + evaluation on MMLU/BBH
2. RAG-based agent generation + evaluation on MMLU/BBH
3. AutoGen AgentBuilder baseline
4. Baseline: pure model with generic system prompt (no agent generation)
Features:
- Adaptive parallel execution:
* Datasets with > LARGE_DATASET_THRESHOLD samples use MAX_PARALLEL_LARGE threads
* Smaller datasets use the regular MAX_PARALLEL threads
- Progress bars via tqdm
- Checkpoint/resume support
- Comprehensive metrics collection
"""
import json
import sys
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
from dataclasses import dataclass
from pathlib import Path
from typing import TYPE_CHECKING
from openai import OpenAI
from tqdm import tqdm
if TYPE_CHECKING:
from retrieval.retriever import AgentRetriever
# Add project root to path
sys.path.insert(0, str(Path(__file__).resolve().parent.parent))
from experiments.agent_runner import AgentAnswer, retry_on_network_error, run_agent_on_sample
from experiments.benchmark_data import BenchmarkSample, load_bbh, load_bigbench, load_mmlu
from experiments.checkpoint import CheckpointManager, get_checkpoint_id
from experiments.experiment_configs import (
ALL_CONFIGS,
AUTOGEN_CONFIG,
BASELINE_CONFIG,
RAG_CONFIGS,
RETRIEVAL_CONFIGS,
ExperimentConfig,
ExperimentMode,
)
from experiments.metrics import compute_metrics, print_metrics_table, save_metrics_report
# ── Constants ─────────────────────────────────────────────────────────────────
MAX_PARALLEL = 20 # Max concurrent threads for small datasets (≤ threshold)
MAX_PARALLEL_LARGE = 120 # Max concurrent threads for large datasets (> threshold)
# Each thread runs the FULL pipeline end-to-end:
# 1. Retrieval / RAG search (embedding lookup, optional rerank)
# 2. Agent generation via LLM (RAG / AutoGen modes only)
# 3. Agent execution — LLM answers the benchmark question
LARGE_DATASET_THRESHOLD = 40 # Sample count above which to use MAX_PARALLEL_LARGE
MAX_SAMPLES_PER_SUBJECT = None # None = load ALL samples (no limit)
MMLU_SUBJECTS_SUBSET = [
"abstract_algebra",
"college_computer_science",
"college_mathematics",
"conceptual_physics",
"formal_logic",
"high_school_biology",
"high_school_chemistry",
"high_school_mathematics",
"high_school_physics",
"machine_learning",
"logical_fallacies",
"global_facts",
"computer_security",
]
BBH_TASKS_SUBSET = [
"boolean_expressions",
"causal_judgement",
"date_understanding",
"disambiguation_qa",
"formal_fallacies",
"logical_deduction_three_objects",
"navigate",
"sports_understanding",
"web_of_lies",
"word_sorting",
]
BIGBENCH_TASKS_SUBSET = [
"abstract_narrative_understanding",
"anachronisms",
"causal_judgment",
"cause_and_effect",
"elementary_math_qa",
"epistemic_reasoning",
"general_knowledge",
"logical_fallacy_detection",
"odd_one_out",
"strategyqa",
]
import os
LLM_MODEL = os.getenv("LLM_MODEL", "gpt-4")
LLM_BASE_URL = os.getenv("LLM_BASE_URL", "https://api.openai.com/v1")
LLM_API_KEY = os.getenv("LLM_API_KEY", "")
# ── Generation result ─────────────────────────────────────────────────────────
@dataclass
class AgentGenResult:
"""Result of generating/retrieving an agent spec."""
agent_spec: dict
gen_time: float = 0.0
gen_prompt_tokens: int = 0
gen_completion_tokens: int = 0
gen_total_tokens: int = 0
# ── Retrieval-based agent generation ─────────────────────────────────────────
def get_agent_via_retrieval(
query: str,
config: ExperimentConfig,
) -> AgentGenResult:
"""
Retrieve an agent spec using the retrieval system.
No LLM tokens spent — pure embedding search.
"""
from retrieval.config import RetrievalConfig
from retrieval.retriever import AgentRetriever
retrieval_config = RetrievalConfig(
dataset_type=config.dataset_type,
embedding_model=config.embedding_model,
top_k=config.top_k,
use_reranker=config.use_reranker,
reranker_model=config.reranker_model,
rerank_top_k=config.rerank_top_k,
)
retriever = AgentRetriever(retrieval_config, verbose=False)
t0 = time.perf_counter()
retriever.initialize()
results = retriever.search(query, top_k=1)
retrieval_time = time.perf_counter() - t0
if results:
agent = results[0].agent
spec = {
"agent_id": agent.agent_id,
"display_name": agent.display_name,
"persona": agent.persona,
"description": agent.description,
"tools": agent.tools,
}
else:
spec = {
"agent_id": "fallback",
"display_name": "General Assistant",
"persona": "A helpful AI assistant.",
"description": "Answers questions accurately.",
"tools": [],
}
return AgentGenResult(agent_spec=spec, gen_time=retrieval_time)
# ── RAG-based agent generation (with token counting) ─────────────────────────
def get_agent_via_rag(
query: str,
config: ExperimentConfig,
shared_retriever: "AgentRetriever | None" = None,
shared_gen_client: "OpenAI | None" = None,
) -> AgentGenResult:
"""
Generate an agent spec using the RAG system.
Counts LLM tokens spent on agent generation.
Args:
query: The query to generate an agent for.
config: Experiment configuration.
shared_retriever: Pre-initialized retriever (avoids reloading embeddings).
shared_gen_client: Pre-initialized OpenAI client for generation.
"""
from retrieval.rag import SYSTEM_PROMPT, build_prompt, parse_agent_response
# --- Step 1: Retrieval using shared retriever (no LLM tokens) ---
t0 = time.perf_counter()
if shared_retriever is not None:
retriever = shared_retriever
else:
# Fallback: create new retriever (slow, should not happen in normal flow)
from retrieval.config import RetrievalConfig
from retrieval.retriever import AgentRetriever
retrieval_config = RetrievalConfig(
dataset_type=config.dataset_type,
embedding_model=config.embedding_model,
top_k=config.top_k,
use_reranker=config.use_reranker,
)
retriever = AgentRetriever(retrieval_config, verbose=False)
retriever.initialize()
examples = retriever.search(query, top_k=config.num_retrieved_for_context)
# --- Step 2: LLM generation (count tokens!) ---
prompt = build_prompt(
query=query,
examples=examples if config.include_examples_in_prompt else [],
num_agents=1,
)
gen_client = shared_gen_client or OpenAI(
base_url=config.agent_base_url,
api_key=config.agent_api_key,
timeout=120,
)
response = retry_on_network_error(
gen_client.chat.completions.create,
model=config.agent_model,
messages=[
{"role": "system", "content": SYSTEM_PROMPT},
{"role": "user", "content": prompt},
],
temperature=config.llm_temperature,
max_tokens=2048,
)
gen_time = time.perf_counter() - t0
# Count generation tokens
usage = response.usage
gen_prompt_tokens = usage.prompt_tokens if usage else 0
gen_completion_tokens = usage.completion_tokens if usage else 0
gen_total_tokens = usage.total_tokens if usage else 0
response_text = response.choices[0].message.content or ""
if not response_text:
msg = response.choices[0].message
if hasattr(msg, "reasoning_content") and msg.reasoning_content:
response_text = msg.reasoning_content
try:
agents = parse_agent_response(response_text, 1)
agent = agents[0] if agents else {}
except Exception:
agent = {}
agent.setdefault("agent_id", "rag_generated")
agent.setdefault("display_name", "RAG Agent")
agent.setdefault("persona", "")
agent.setdefault("description", "")
agent.setdefault("tools", [])
return AgentGenResult(
agent_spec=agent,
gen_time=gen_time,
gen_prompt_tokens=gen_prompt_tokens,
gen_completion_tokens=gen_completion_tokens,
gen_total_tokens=gen_total_tokens,
)
# ── AutoGen-based agent generation (real pyautogen) ──────────────────────────
def get_agent_via_autogen(
query: str,
config: ExperimentConfig,
shared_gen_client: "OpenAI | None" = None,
) -> AgentGenResult:
"""
Generate an agent using AutoGen's AssistantAgent framework.
Uses the same LLM API. Counts tokens for agent creation.
"""
import re
t0 = time.perf_counter()
client = shared_gen_client or OpenAI(
base_url=config.agent_base_url,
api_key=config.agent_api_key,
)
# Step 1: Use LLM to generate agent spec (simulating AutoGen AgentBuilder)
gen_prompt = (
"You are AutoGen AgentBuilder. Given a task description, create a specialized "
"AI agent configuration.\n\n"
f"Task: {query}\n\n"
"Generate a JSON agent specification with these fields:\n"
"- agent_id: snake_case identifier\n"
"- display_name: human-readable name\n"
"- persona: detailed personality and expertise description\n"
"- description: what the agent does and its capabilities\n"
"- system_message: the system prompt this agent should use\n"
"- tools: list of tool names (can be empty)\n\n"
"Output ONLY valid JSON, no other text."
)
gen_prompt_tokens = 0
gen_completion_tokens = 0
gen_total_tokens = 0
try:
response = retry_on_network_error(
client.chat.completions.create,
model=config.agent_model,
messages=[
{
"role": "system",
"content": "You are AutoGen AgentBuilder - a system that creates specialized AI agents.",
},
{"role": "user", "content": gen_prompt},
],
temperature=0.7,
max_tokens=1024,
)
# Count generation tokens
usage = response.usage
gen_prompt_tokens = usage.prompt_tokens if usage else 0
gen_completion_tokens = usage.completion_tokens if usage else 0
gen_total_tokens = usage.total_tokens if usage else 0
text = response.choices[0].message.content or ""
if not text:
msg = response.choices[0].message
if hasattr(msg, "reasoning_content") and msg.reasoning_content:
text = msg.reasoning_content
# Parse JSON
json_match = re.search(r'\{[^{}]*("tools"\s*:\s*\[[^\]]*\])?[^{}]*\}', text, re.DOTALL)
agent = json.loads(json_match.group()) if json_match else json.loads(text)
agent.setdefault("agent_id", "autogen_generated")
agent.setdefault("display_name", "AutoGen Agent")
agent.setdefault("persona", "")
agent.setdefault("description", "")
agent.setdefault("tools", [])
except Exception:
agent = {
"agent_id": "autogen_fallback",
"display_name": "AutoGen Assistant",
"persona": "A helpful AI assistant created by AutoGen.",
"description": "Answers questions accurately and helpfully.",
"tools": [],
}
gen_time = time.perf_counter() - t0
return AgentGenResult(
agent_spec=agent,
gen_time=gen_time,
gen_prompt_tokens=gen_prompt_tokens,
gen_completion_tokens=gen_completion_tokens,
gen_total_tokens=gen_total_tokens,
)
# ── Baseline: pure model with generic system prompt ──────────────────────────
def get_agent_via_baseline(
query: str,
config: ExperimentConfig,
) -> AgentGenResult:
"""
Return a generic agent spec with a universal system prompt.
No retrieval, no RAG, no AutoGen — pure model baseline.
Zero generation tokens, zero generation time.
"""
agent_spec = {
"agent_id": "baseline_agent",
"display_name": "Baseline Assistant",
"persona": (
"You are a helpful, accurate, and concise AI assistant. You answer questions to the best of your knowledge."
),
"description": (
"A general-purpose AI assistant with no specialized knowledge retrieval. "
"Answers questions using only the base model capabilities."
),
"tools": [],
}
return AgentGenResult(agent_spec=agent_spec, gen_time=0.0)
# ── Process a single sample ───────────────────────────────────────────────────
def _generate_agent(
sample: BenchmarkSample,
config: ExperimentConfig,
shared_retriever: "AgentRetriever | None" = None,
shared_gen_client: "OpenAI | None" = None,
) -> AgentGenResult:
"""Generate/retrieve an agent spec for a sample."""
query_for_agent = sample.question[:200]
if config.mode == ExperimentMode.RETRIEVAL:
return get_agent_via_retrieval(query_for_agent, config)
if config.mode == ExperimentMode.RAG:
return get_agent_via_rag(query_for_agent, config, shared_retriever, shared_gen_client)
if config.mode == ExperimentMode.AUTOGEN:
return get_agent_via_autogen(query_for_agent, config, shared_gen_client)
if config.mode == ExperimentMode.BASELINE:
return get_agent_via_baseline(query_for_agent, config)
msg = f"Unknown mode: {config.mode}"
raise ValueError(msg)
def process_sample(
sample: BenchmarkSample,
config: ExperimentConfig,
client: OpenAI,
shared_retriever: "AgentRetriever | None" = None,
shared_gen_client: "OpenAI | None" = None,
) -> AgentAnswer:
"""
Full pipeline for a single sample:
1. Generate/retrieve agent spec (count gen tokens)
2. Run agent on the benchmark question (count exec tokens)
3. Return answer with all metrics
"""
# Step 1: Get agent spec
gen_result = _generate_agent(sample, config, shared_retriever, shared_gen_client)
# Step 2: Run agent on sample
answer = run_agent_on_sample(
agent_spec=gen_result.agent_spec,
sample=sample,
client=client,
model=config.agent_model,
temperature=config.agent_temperature,
max_tokens=config.agent_max_tokens,
)
# Step 3: Merge generation metrics into answer
answer.gen_prompt_tokens = gen_result.gen_prompt_tokens
answer.gen_completion_tokens = gen_result.gen_completion_tokens
answer.gen_total_tokens = gen_result.gen_total_tokens
answer.retrieval_time = gen_result.gen_time
answer.latency_seconds = gen_result.gen_time + answer.execution_time
return answer
# ── Run a single experiment config on a dataset ──────────────────────────────
def run_single_experiment(
config: ExperimentConfig,
samples: list[BenchmarkSample],
dataset_name: str,
max_parallel: int = MAX_PARALLEL,
shared_retriever: "AgentRetriever | None" = None,
shared_gen_client: "OpenAI | None" = None,
) -> list[AgentAnswer]:
"""
Run a single experiment configuration on a dataset.
Features:
- Parallel execution with ThreadPoolExecutor
- Progress bar via tqdm
- Checkpoint/resume support
Args:
shared_retriever: Pre-initialized retriever (reused across datasets).
shared_gen_client: Pre-initialized OpenAI client for agent generation.
"""
checkpoint_id = get_checkpoint_id(config.config_id, dataset_name)
ckpt = CheckpointManager(checkpoint_id)
# Save experiment metadata
ckpt.set_metadata("config_id", config.config_id)
ckpt.set_metadata("dataset_name", dataset_name)
ckpt.set_metadata("mode", config.mode.value)
ckpt.set_metadata("description", config.description)
ckpt.set_metadata("total_samples", len(samples))
# Filter out already completed samples
completed_ids = ckpt.get_completed_ids()
remaining = [s for s in samples if s.sample_id not in completed_ids]
if not remaining:
return [AgentAnswer(**r) for r in ckpt.get_all_results()]
# Create OpenAI client for agent execution (shared across all threads)
client = OpenAI(
base_url=config.agent_base_url,
api_key=config.agent_api_key,
timeout=120,
)
results: list[AgentAnswer] = []
errors = 0
desc = f"{config.config_id}/{dataset_name}"
pbar = tqdm(total=len(remaining), desc=desc, unit="sample", leave=True)
def _process_one(sample: BenchmarkSample) -> AgentAnswer:
"""
Full end-to-end pipeline for one sample — runs entirely inside a single thread.
Steps (all in this thread, nothing split out):
1. Agent generation:
- RETRIEVAL : embedding search on shared index (no LLM tokens)
- RAG : embedding search → LLM call to synthesise agent spec
- AutoGen : LLM call to build agent spec
- Baseline : fixed generic spec, no calls
2. Agent execution: LLM call with the generated agent's system prompt
to answer the actual benchmark question.
"""
if config.mode == ExperimentMode.RETRIEVAL and shared_retriever is not None:
# Step 1 — retrieval search (thread-safe, shared index, no LLM tokens)
query = sample.question[:200]
t0 = time.perf_counter()
search_results = shared_retriever.search(query, top_k=1)
retrieval_time = time.perf_counter() - t0
if search_results:
agent = search_results[0].agent
agent_spec = {
"agent_id": agent.agent_id,
"display_name": agent.display_name,
"persona": agent.persona,
"description": agent.description,
"tools": agent.tools,
}
else:
agent_spec = {
"agent_id": "fallback",
"display_name": "General Assistant",
"persona": "A helpful AI assistant.",
"description": "Answers questions accurately.",
"tools": [],
}
retrieval_time = 0.0
# Step 2 — agent execution (LLM call)
answer = run_agent_on_sample(
agent_spec=agent_spec,
sample=sample,
client=client,
model=config.agent_model,
temperature=config.agent_temperature,
max_tokens=config.agent_max_tokens,
)
answer.retrieval_time = retrieval_time
answer.latency_seconds = retrieval_time + answer.execution_time
return answer
# RAG / AutoGen / Baseline:
# Step 1 (search + optional LLM gen) + Step 2 (LLM exec) — all in one call
return process_sample(
sample,
config,
client,
shared_retriever=shared_retriever,
shared_gen_client=shared_gen_client,
)
# Run in parallel
with ThreadPoolExecutor(max_workers=max_parallel) as executor:
future_to_sample = {executor.submit(_process_one, sample): sample for sample in remaining}
for future in as_completed(future_to_sample):
try:
answer = future.result()
results.append(answer)
ckpt.save_result(answer)
if answer.error:
errors += 1
except Exception as e:
sample = future_to_sample[future]
answer = AgentAnswer(
sample_id=sample.sample_id,
predicted_answer="",
correct_answer=sample.correct_answer,
is_correct=False,
error=str(e),
)
results.append(answer)
ckpt.save_result(answer)
errors += 1
pbar.update(1)
# Update postfix with running accuracy
correct_so_far = sum(1 for r in results if r.is_correct)
total_so_far = len(results)
pbar.set_postfix(
{
"acc": f"{correct_so_far / total_so_far:.1%}" if total_so_far else "N/A",
"err": errors,
}
)
pbar.close()
# Combine with previously cached results
return [AgentAnswer(**r) for r in ckpt.get_all_results()]
# ── Main orchestrator ─────────────────────────────────────────────────────────
def load_datasets(max_samples: int = MAX_SAMPLES_PER_SUBJECT) -> dict[str, list[BenchmarkSample]]:
"""Load all benchmark datasets."""
mmlu = load_mmlu(
subjects=MMLU_SUBJECTS_SUBSET,
max_samples_per_subject=max_samples,
)
bbh = load_bbh(
tasks=BBH_TASKS_SUBSET,
max_samples_per_task=max_samples,
)
bigbench = load_bigbench(
tasks=BIGBENCH_TASKS_SUBSET,
max_samples_per_task=max_samples,
)
return {"mmlu": mmlu, "bbh": bbh, "bigbench": bigbench}
def run_all_experiments(
configs: list[ExperimentConfig] | None = None,
datasets: dict[str, list[BenchmarkSample]] | None = None,
max_parallel: int = MAX_PARALLEL,
max_parallel_large: int = MAX_PARALLEL_LARGE,
large_dataset_threshold: int = LARGE_DATASET_THRESHOLD,
) -> dict[str, list[AgentAnswer]]:
"""
Run all experiment configurations on all datasets.
Adaptive parallelism: datasets with more than ``large_dataset_threshold``
samples are processed with ``max_parallel_large`` threads; smaller ones
use ``max_parallel``. Checkpoint files are keyed by config_id+dataset_name
and are completely unaffected by changing parallelism, so already-saved
results are always preserved and resumed correctly.
Returns:
Dict mapping "config_id__dataset_name" -> list of AgentAnswer
"""
if datasets is None:
datasets = load_datasets()
if configs is None:
configs = ALL_CONFIGS
all_results: dict[str, list[AgentAnswer]] = {}
len(configs) * len(datasets)
for _cfg_idx, config in enumerate(configs, 1):
# ── Pre-initialize shared resources ONCE per config ──────────
# Embedding models, JSONL data, indexes, and OpenAI clients are
# loaded here and reused across ALL datasets for this config.
shared_retriever = None
shared_gen_client = None
if config.mode in (ExperimentMode.RETRIEVAL, ExperimentMode.RAG):
from retrieval.config import RetrievalConfig
from retrieval.retriever import AgentRetriever
retrieval_config = RetrievalConfig(
dataset_type=config.dataset_type,
embedding_model=config.embedding_model,
top_k=config.top_k,
use_reranker=config.use_reranker,
reranker_model=config.reranker_model if config.use_reranker else "BAAI/bge-reranker-base",
rerank_top_k=config.rerank_top_k,
)
shared_retriever = AgentRetriever(retrieval_config, verbose=False)
shared_retriever.initialize()
if config.mode in (ExperimentMode.RAG, ExperimentMode.AUTOGEN):
shared_gen_client = OpenAI(
base_url=config.agent_base_url,
api_key=config.agent_api_key,
timeout=120,
)
# ── Run on all datasets with shared resources ────────────────
for ds_name, samples in datasets.items():
# Adaptive parallelism: use a larger thread pool for big datasets
effective_parallel = max_parallel_large if len(samples) > large_dataset_threshold else max_parallel
key = get_checkpoint_id(config.config_id, ds_name)
results = run_single_experiment(
config=config,
samples=samples,
dataset_name=ds_name,
max_parallel=effective_parallel,
shared_retriever=shared_retriever,
shared_gen_client=shared_gen_client,
)
all_results[key] = results
return all_results
# ── Entry point ───────────────────────────────────────────────────────────────
def main():
"""Main entry point for running all experiments."""
import argparse
parser = argparse.ArgumentParser(description="Run benchmark experiments")
parser.add_argument(
"--mode",
choices=["all", "retrieval", "rag", "autogen", "baseline"],
default="all",
help="Which experiments to run",
)
parser.add_argument(
"--parallel",
type=int,
default=MAX_PARALLEL,
help=f"Max parallel tasks for small datasets (≤{LARGE_DATASET_THRESHOLD} samples). Default: {MAX_PARALLEL}",
)
parser.add_argument(
"--parallel-large",
type=int,
default=MAX_PARALLEL_LARGE,
help=f"Max parallel tasks for large datasets (>{LARGE_DATASET_THRESHOLD} samples). "
f"Default: {MAX_PARALLEL_LARGE}",
)
parser.add_argument(
"--large-threshold",
type=int,
default=LARGE_DATASET_THRESHOLD,
help=f"Sample count threshold that triggers higher parallelism. Default: {LARGE_DATASET_THRESHOLD}",
)
parser.add_argument("--max-samples", type=int, default=None, help="Max samples per subject/task (default: all)")
parser.add_argument("--report-only", action="store_true", help="Only generate report from existing checkpoints")
args = parser.parse_args()
max_samples = args.max_samples
if args.report_only:
# Just generate report from existing checkpoints
from experiments.checkpoint import list_checkpoints
checkpoints = list_checkpoints()
if not checkpoints:
return
all_results = {}
for ckpt_info in checkpoints:
ckpt = CheckpointManager(ckpt_info["experiment_id"])
results = [AgentAnswer(**r) for r in ckpt.get_all_results()]
all_results[ckpt_info["experiment_id"]] = results
# Generate report
for key, results in all_results.items():
if results:
metrics = compute_metrics(results)
print_metrics_table({key: metrics})
save_metrics_report(all_results, Path("experiments/results"))
return
# Load datasets
datasets = load_datasets(max_samples=max_samples)
# Select configs based on mode
if args.mode == "retrieval":
configs = RETRIEVAL_CONFIGS
elif args.mode == "rag":
configs = RAG_CONFIGS
elif args.mode == "autogen":
configs = [AUTOGEN_CONFIG]
elif args.mode == "baseline":
configs = [BASELINE_CONFIG]
else:
configs = ALL_CONFIGS
# Run experiments
all_results = run_all_experiments(
configs=configs,
datasets=datasets,
max_parallel=args.parallel,
max_parallel_large=args.parallel_large,
large_dataset_threshold=args.large_threshold,
)
# Generate report
metrics_dict = {}
for key, results in all_results.items():
if results:
metrics = compute_metrics(results)
metrics_dict[key] = metrics
print_metrics_table(metrics_dict)
save_metrics_report(all_results, Path("experiments/results"))
if __name__ == "__main__":
main()