SciMLx_Production / agent_loop.py
Moatasim Farooque
Remove problematic files
54fa103
"""Autonomous Agent Orchestration Loop for SciML Research.
This is ONE of TWO supported orchestration modes β€” choose whichever fits your
workflow:
MODE A Β· External agent (RESEARCH_BRAIN.md) ← original, always supported
─────────────────────────────────────────────
An external AI agent (Claude Code, GPT-4, etc.) reads RESEARCH_BRAIN.md and drives
the research loop manually: edits experiments.yaml, calls autorun.py,
interprets results, forms hypotheses, and commits improvements.
Best for: interactive research, novel architecture ideas, steering by
intuition, or when you want full human-visible control over every decision.
MODE B Β· agent_loop.py (this file) ← optional in-process loop
─────────────────────────────────────────────
A fully automated in-process loop that replaces the need for an external
agent to interpret results. It reads tracker.analyze_lineage(), calls
HypothesisEngine, runs Bayesian HPO, and appends new ExperimentConfigs to
experiments.yaml β€” all without human intervention.
Best for: overnight runs, saturating the queue automatically after an
external agent session, or scaling up experiment throughput.
Both modes share the same infrastructure (experiments.yaml queue, results.json,
autorun.py runner) and can be used interchangeably or together.
See RESEARCH_BRAIN.md for the authoritative guide to operating this system.
Steps performed in Mode B:
1. Analyses current state via tracker.analyze_lineage()
2. Identifies failure patterns via hypothesis.HypothesisEngine
3. Generates next experiments via auto_suggest + bayesian_hpo
4. Writes new ExperimentConfig entries to experiments.yaml (gated)
5. Optionally triggers autorun.py for the next batch
Usage:
uv run agent_loop.py --dry-run # plan without writing
uv run agent_loop.py --benchmark kdv_1d # focus on one benchmark
uv run agent_loop.py --run # generate + immediately run top-3
uv run agent_loop.py --top 5 # generate top-N new configs
The loop checks .autorun_pause before each action and respects it.
See program.md for the external-agent (Mode A) workflow.
"""
import argparse
import json
import math
import subprocess
import sys
import textwrap
from pathlib import Path
from typing import Optional
import numpy as np
from core.utils import REPO_ROOT
from core.hpo import _normalize, PARAM_KEYS
# ── Imports (lazy to avoid MLX startup cost when just planning) ───────────────
def _load_tracker():
from core.tracker import Tracker
return Tracker()
def _load_hypothesis():
from core.hypothesis import HypothesisEngine
return HypothesisEngine()
def _load_hpo(benchmark: str, model: str = "FNO"):
from core.hpo import BayesianHPO
hpo = BayesianHPO(benchmark, model)
hpo.load_history()
return hpo
# ── State checks ──────────────────────────────────────────────────────────────
def is_paused() -> bool:
return (REPO_ROOT / ".autorun_pause").exists()
def current_sota_gaps() -> dict[str, float]:
"""Return ratio (our_best / sota) for each benchmark. <1 means we beat SOTA."""
from core.utils import SOTA, load_results, best_per_benchmark
rows = load_results()
best = best_per_benchmark(rows)
gaps = {}
for bm, sota in SOTA.items():
our = best.get(bm)
if our:
gaps[bm] = round(our / sota, 3)
return gaps
def pending_count() -> int:
from core.loader import get_experiments
from core.utils import done_names
done = done_names()
return sum(1 for e in get_experiments() if e.name not in done)
# ── Core analysis ─────────────────────────────────────────────────────────────
def analyse_state(benchmark: Optional[str] = None) -> dict:
"""Full state analysis: lineage + hypothesis + SOTA gaps."""
tracker = _load_tracker()
engine = _load_hypothesis()
analysis = tracker.analyze_lineage(benchmark)
gaps = current_sota_gaps()
pending = pending_count()
# Identify highest-priority benchmarks (farthest from SOTA with fewest runs)
summaries = analysis.get("benchmark_summaries", {})
priority_queue = []
for bm, s in summaries.items():
gap = gaps.get(bm, float("inf"))
runs = s.get("n_experiments", 0)
score = gap * max(1, 10 - runs) # high gap + few runs = highest priority
priority_queue.append((score, bm, gap, runs))
priority_queue.sort(reverse=True)
# Hypothesis-driven next steps for each benchmark
interventions = {}
for _, bm, gap, runs in priority_queue[:5]:
bm_report = engine.analyze_benchmark(bm)
best_val = bm_report.get("best_val", float("inf"))
interv = engine.suggest_intervention(bm, best_val)
if interv:
interventions[bm] = interv
return {
"analysis": analysis,
"sota_gaps": gaps,
"pending_experiments": pending,
"priority_order": [(bm, gap, runs) for _, bm, gap, runs in priority_queue],
"interventions": interventions,
}
# ── Config generation ─────────────────────────────────────────────────────────
def _name_from_config(benchmark: str, model: str, cfg: dict) -> str:
h = cfg.get("hidden_dim", 64)
l = cfg.get("n_layers", 4)
m = cfg.get("n_modes", 16)
return f"agent_{model.lower()}_{benchmark[:5]}_h{h}_l{l}_m{m}"
def generate_new_configs(state: dict, top_n: int = 5, no_hpo: bool = False) -> list[dict]:
"""Use Bayesian HPO + hypothesis interventions to propose new ExperimentConfigs."""
from core.utils import done_names
done = done_names()
configs = []
# Source 1: hypothesis-driven interventions (highest confidence)
for bm, interv in state["interventions"].items():
model = interv.get("model", "FNO")
cfg = {
"hidden_dim": interv.get("hidden_dim", 128),
"n_layers": interv.get("n_layers", 8),
"n_modes": interv.get("n_modes", 24),
"lr": interv.get("lr", 1e-3),
}
loss = interv.get("loss_type", "l2_rel")
name = _name_from_config(bm, model, cfg)
if name in done:
continue
is_2d = "2d" in bm
configs.append({
"name": name,
"benchmark": bm,
"model": model,
"hidden_dim": cfg["hidden_dim"],
"n_layers": cfg["n_layers"],
"n_modes": cfg["n_modes"],
"loss_type": loss,
"budget_s": 480 if is_2d else 300,
"priority": 1,
"rationale": interv.get("rationale", "Hypothesis-engine suggestion"),
"paper_ref": interv.get("paper_ref", ""),
"source": "hypothesis",
})
# Source 2: Bayesian HPO suggestions for top-priority benchmarks
if no_hpo:
pass # skipped via --no-hpo flag
for bm, gap, runs in ([] if no_hpo else state["priority_order"][:3]):
if gap < 1.0:
continue # already beating SOTA, deprioritise
try:
hpo = _load_hpo(bm)
if len(hpo.y) < 2:
continue # not enough data for meaningful GP
for _ in range(2):
cfg = hpo.ask()
name = _name_from_config(bm, "FNO", cfg)
if name in done:
continue
is_2d = "2d" in bm
configs.append({
"name": name,
"benchmark": bm,
"model": "FNO",
"hidden_dim": cfg["hidden_dim"],
"n_layers": cfg["n_layers"],
"n_modes": cfg["n_modes"],
"loss_type": "l2_rel",
"budget_s": 480 if is_2d else 300,
"priority": 2,
"rationale": f"Bayesian HPO suggestion (GP-EI, {len(hpo.y)} obs on {bm})",
"source": "bayesian_hpo",
})
except Exception:
pass
# Deduplicate by name, cap at top_n
seen = set()
unique = []
for c in configs:
if c["name"] not in seen and c["name"] not in done:
seen.add(c["name"])
unique.append(c)
if len(unique) >= top_n:
break
return unique
# ── Code generation ───────────────────────────────────────────────────────────
def _config_to_code(cfg: dict) -> str:
"""Render an ExperimentConfig(...) code block."""
lines = [
f" ExperimentConfig(",
f" name={cfg['name']!r},",
f" benchmark={cfg['benchmark']!r},",
f" model={cfg['model']!r},",
f" hidden_dim={cfg['hidden_dim']}, n_layers={cfg['n_layers']}, n_modes={cfg['n_modes']},",
]
if cfg.get("loss_type", "l2_rel") != "l2_rel":
lines.append(f" loss_type={cfg['loss_type']!r},")
if cfg.get("budget_s", 300) != 300:
lines.append(f" budget_s={cfg['budget_s']},")
lines.append(f" priority={cfg['priority']},")
if cfg.get("rationale"):
rat = textwrap.shorten(cfg["rationale"], width=90)
lines.append(f" rationale={rat!r},")
if cfg.get("paper_ref"):
lines.append(f" paper_ref={cfg['paper_ref']!r},")
lines.append(f" ),")
return "\n".join(lines)
def _config_novelty(cfg: dict, existing_vectors: list[np.ndarray],
threshold: float = 0.97) -> bool:
"""Return True if cfg is sufficiently novel vs. all existing_vectors.
Uses cosine similarity on normalized config vectors (same space as BayesianHPO).
Configs with cosine similarity β‰₯ threshold to any existing config are rejected
as near-duplicates β€” they probe the same hyperparameter region.
"""
if not existing_vectors:
return True
v = _normalize(cfg)
norm_v = np.linalg.norm(v)
if norm_v < 1e-10:
return True
for ev in existing_vectors:
norm_ev = np.linalg.norm(ev)
if norm_ev < 1e-10:
continue
sim = float(np.dot(v, ev) / (norm_v * norm_ev))
if sim >= threshold:
return False
return True
def _build_existing_vectors(content: str, done_results: set[str]) -> list[np.ndarray]:
"""Build config vectors for all experiments already queued or completed.
Parses the YAML-like content of experiments.yaml and uses results.json
done set to collect existing hyperparameter vectors for novelty checking.
"""
import re
vectors: list[np.ndarray] = []
# Parse experiments.yaml: extract hidden_dim, n_layers, n_modes blocks
# Each experiment block is delineated by a `- name:` line
blocks = re.split(r"\n- name:", content)
for block in blocks[1:]: # skip preamble
cfg: dict = {}
for key in ("hidden_dim", "n_layers", "n_modes", "lr"):
m = re.search(rf"{key}:\s*([\d.e+-]+)", block)
if m:
try:
cfg[key] = float(m.group(1))
except ValueError:
pass
if cfg:
vectors.append(_normalize(cfg))
return vectors
def append_configs_to_experiments(configs: list[dict]) -> int:
"""Append new ExperimentConfig entries to experiments.yaml (gated: smoke-test first)."""
if not configs:
return 0
exp_path = REPO_ROOT / "experiments.yaml"
content = exp_path.read_text()
# Validate: each config must have required fields and unique name
from core.utils import done_names
done = done_names()
existing_vectors = _build_existing_vectors(content, done)
to_add = []
for cfg in configs:
if not all(k in cfg for k in ("name", "benchmark", "model", "hidden_dim", "n_layers")):
print(f" SKIP {cfg.get('name','?')} β€” missing required fields")
continue
if cfg["name"] in done:
print(f" SKIP {cfg['name']} β€” already in results")
continue
if f'name: {cfg["name"]}' in content:
print(f" SKIP {cfg['name']} β€” already in experiments.yaml")
continue
if not _config_novelty(cfg, existing_vectors):
print(f" SKIP {cfg['name']} β€” near-duplicate of existing config (cosine sim β‰₯ 0.97)")
continue
existing_vectors.append(_normalize(cfg)) # avoid adding two near-dupes from same batch
to_add.append(cfg)
if not to_add:
return 0
# Build insertion block
block = "\n # ── Agent-generated experiments (" + \
__import__("datetime").datetime.now().strftime("%Y-%m-%d %H:%M") + \
") ──────────────────────────────\n"
for cfg in to_add:
block += _config_to_code(cfg) + "\n"
# Insert before the closing `]` of EXPERIMENTS
insertion_point = content.rfind("\n]")
if insertion_point == -1:
print(" ERROR: could not find EXPERIMENTS closing ] in experiments.yaml")
return 0
new_content = content[:insertion_point] + block + content[insertion_point:]
exp_path.write_text(new_content)
print(f" Appended {len(to_add)} new configs to experiments.yaml")
return len(to_add)
# ── Reporting ─────────────────────────────────────────────────────────────────
def print_state_report(state: dict) -> None:
gaps = state["sota_gaps"]
pending = state["pending_experiments"]
print(f"\n{'═'*65}")
print(f" SciML Agent Loop β€” State Report")
print(f"{'═'*65}")
print(f"\n Pending experiments in queue: {pending}")
print(f"\n SOTA gaps (our_best / sota β€” <1.0 means beating SOTA):")
for bm, gap in sorted(gaps.items(), key=lambda x: -x[1]):
bar = "β–ˆ" * min(20, int(gap * 5)) if gap > 0.1 else ""
flag = " ← BEATS SOTA" if gap < 1.0 else ""
print(f" {bm:<25} {gap:>6.2f}x {bar}{flag}")
order = state.get("priority_order", [])
if order:
print(f"\n Research priority order:")
for i, (bm, gap, runs) in enumerate(order[:5], 1):
print(f" {i}. {bm:<25} gap={gap:.2f}x runs={runs}")
interventions = state.get("interventions", {})
if interventions:
print(f"\n Hypothesis-engine interventions:")
for bm, interv in interventions.items():
model = interv.get("model", "FNO")
rat = textwrap.shorten(interv.get("rationale", ""), width=65)
print(f" {bm}: β†’ {model} {rat}")
print()
def print_config_proposals(configs: list[dict]) -> None:
if not configs:
print(" No new configs generated.")
return
print(f"\n Generated {len(configs)} new ExperimentConfig proposals:\n")
for cfg in configs:
print(f" [{cfg['source']}] {cfg['name']}")
print(f" {cfg['benchmark']} / {cfg['model']} "
f"h={cfg['hidden_dim']} l={cfg['n_layers']} m={cfg['n_modes']}")
rat = textwrap.shorten(cfg.get("rationale", ""), width=70)
print(f" {rat}")
print()
# ── Main ──────────────────────────────────────────────────────────────────────
def main() -> None:
p = argparse.ArgumentParser(description="SciML Agent Orchestration Loop")
p.add_argument("--benchmark", default=None,
help="Focus on a specific benchmark")
p.add_argument("--top", type=int, default=5,
help="Number of new configs to generate")
p.add_argument("--dry-run", action="store_true",
help="Analyse and plan without writing to experiments.yaml")
p.add_argument("--run", action="store_true",
help="After generating configs, immediately run top-3 via autorun.py")
p.add_argument("--no-hpo", action="store_true",
help="Skip Bayesian HPO suggestions (faster startup)")
args = p.parse_args()
if is_paused():
print(" Loop is paused (.autorun_pause exists). Remove it to resume.")
sys.exit(0)
print("Analysing current state...")
state = analyse_state(args.benchmark)
print_state_report(state)
print("Generating new experiment proposals...")
configs = generate_new_configs(state, top_n=args.top, no_hpo=args.no_hpo)
print_config_proposals(configs)
if args.dry_run:
print(" [dry-run] Not writing to experiments.yaml.")
print(" Proposed ExperimentConfig snippets:\n")
for cfg in configs:
print(_config_to_code(cfg))
print()
return
n_added = append_configs_to_experiments(configs)
if n_added == 0:
print(" Nothing new to add β€” queue already covers all suggestions.")
return
if args.run:
print(f"\nLaunching autorun.py for top-3 new experiments...")
subprocess.run(
["uv", "run", "autorun.py", "--max", "3",
"--priority", "1", "--commit"],
cwd=REPO_ROOT,
)
if __name__ == "__main__":
main()