Spaces:
Runtime error
Runtime error
| """Discovery Engine Tracker — DAG-based experiment lineage for SciML autoresearch. | |
| Manages results.json (the SSoT for lineage). | |
| Tracks branching via parent_id and structured rationale/conclusions. | |
| """ | |
| import json | |
| import time | |
| import numpy as np | |
| from pathlib import Path | |
| from typing import Optional, Dict, Any, List | |
| # ── Path Constants ──────────────────────────────────────────────────────────── | |
| from core.utils import REPO_ROOT | |
| RESULTS_JSON = REPO_ROOT / "results.json" | |
| class Tracker: | |
| def __init__(self, json_path: Path = RESULTS_JSON): | |
| self.json_path = json_path | |
| self.experiments = [] | |
| self._load() | |
| def _load(self): | |
| from core.results_store import store | |
| self.experiments = store.load() | |
| def _save(self): | |
| from core.results_store import store | |
| if self.experiments: | |
| store.append(self.experiments[-1]) | |
| def log_experiment(self, | |
| benchmark: str, | |
| model: str, | |
| val_l2_rel: float, | |
| memory_gb: float, | |
| status: str, | |
| description: str, | |
| commit: str, | |
| parent_id: Optional[str] = None, | |
| parent_name: Optional[str] = None, | |
| config: Optional[Dict] = None, | |
| rationale: str = "", | |
| conclusion: str = "", | |
| diag: Optional[Dict] = None): | |
| """Log a new experiment node to the discovery tree.""" | |
| import uuid as _uuid | |
| exp_id = f"{benchmark}_{model}_{int(time.time())}_{_uuid.uuid4().hex[:6]}" | |
| # Resolve parent_id: explicit > by parent_name > last keep for benchmark | |
| if not parent_id: | |
| if parent_name: | |
| for e in reversed(self.experiments): | |
| # Prefer exact config.name match (reliable) | |
| cfg_name = (e.get("config") or {}).get("name", "") | |
| if cfg_name == parent_name: | |
| parent_id = e["id"] | |
| break | |
| # Fallback: description prefix match (legacy) | |
| if e.get("description", "").startswith(parent_name): | |
| parent_id = e["id"] | |
| break | |
| if not parent_id: | |
| for e in reversed(self.experiments): | |
| if e['benchmark'] == benchmark and e['status'] == 'keep': | |
| parent_id = e['id'] | |
| break | |
| node = { | |
| "id": exp_id, | |
| "parent_id": parent_id, | |
| "timestamp": int(time.time()), | |
| "benchmark": benchmark, | |
| "model": model, | |
| "val_l2_rel": val_l2_rel, | |
| "memory_gb": memory_gb, | |
| "status": status, | |
| "description": description, | |
| "commit": commit, | |
| "config": config or {}, | |
| "rationale": rationale, | |
| "conclusion": conclusion, | |
| "diag": diag or {}, | |
| } | |
| self.experiments.append(node) | |
| self._save() | |
| return exp_id | |
| def get_lineage(self) -> List[Dict]: | |
| return self.experiments | |
| def get_experiment(self, exp_id: str) -> Optional[Dict]: | |
| for e in self.experiments: | |
| if e['id'] == exp_id: | |
| return e | |
| return None | |
| def analyze_lineage(self, benchmark: str = None) -> Dict: | |
| """Analyze experiment history: HP importance, model ranking, trends.""" | |
| exps = self.experiments | |
| if benchmark: | |
| exps = [e for e in exps if e.get("benchmark") == benchmark] | |
| by_benchmark: Dict[str, list] = {} | |
| for e in exps: | |
| b = e.get("benchmark", "unknown") | |
| by_benchmark.setdefault(b, []).append(e) | |
| summaries = {} | |
| for bm, bm_exps in by_benchmark.items(): | |
| valid = [e for e in bm_exps if e.get("val_l2_rel") and 0 < e["val_l2_rel"] < 10.0] | |
| if not valid: | |
| continue | |
| vals = [e["val_l2_rel"] for e in valid] | |
| best_exp = min(valid, key=lambda e: e["val_l2_rel"]) | |
| # Hyperparameter → val_l2_rel Pearson correlation | |
| hp_importance = {} | |
| for field in ["hidden_dim", "n_layers", "n_modes", "lr", "batch_size"]: | |
| fv, mv = [], [] | |
| for e in valid: | |
| cfg = e.get("config") or {} | |
| if field in cfg and cfg[field] is not None: | |
| try: | |
| fv.append(float(cfg[field])) | |
| mv.append(e["val_l2_rel"]) | |
| except (TypeError, ValueError): | |
| pass | |
| if len(fv) >= 3: | |
| fa, ma = np.array(fv), np.array(mv) | |
| if fa.std() > 0 and ma.std() > 0: | |
| hp_importance[field] = round(float(np.corrcoef(fa, ma)[0, 1]), 3) | |
| # Average val per model | |
| model_vals: Dict[str, list] = {} | |
| for e in valid: | |
| model_vals.setdefault(e.get("model", "?"), []).append(e["val_l2_rel"]) | |
| model_avg = {m: round(sum(vs) / len(vs), 6) for m, vs in model_vals.items()} | |
| # Trend: is performance improving over time? | |
| sorted_t = sorted(valid, key=lambda e: e.get("timestamp", 0)) | |
| early = [e["val_l2_rel"] for e in sorted_t[:5]] | |
| recent = [e["val_l2_rel"] for e in sorted_t[-5:]] | |
| trend = "improving" if (early and recent and min(recent) < min(early)) else "plateaued" | |
| summaries[bm] = { | |
| "n_experiments": len(valid), | |
| "best_val": round(best_exp["val_l2_rel"], 6), | |
| "best_model": best_exp.get("model"), | |
| "best_description": best_exp.get("description", ""), | |
| "mean_val": round(float(np.mean(vals)), 6), | |
| "std_val": round(float(np.std(vals)), 6), | |
| "hp_importance": hp_importance, | |
| "model_avg_val": model_avg, | |
| "trend": trend, | |
| } | |
| # Global cross-benchmark patterns | |
| patterns = [] | |
| model_wins: Dict[str, int] = {} | |
| for bm, bm_exps in by_benchmark.items(): | |
| valid_bm = [e for e in bm_exps if e.get("val_l2_rel") and 0 < e["val_l2_rel"] < 10.0] | |
| if valid_bm: | |
| winner = min(valid_bm, key=lambda e: e["val_l2_rel"]) | |
| m = winner.get("model", "?") | |
| model_wins[m] = model_wins.get(m, 0) + 1 | |
| if model_wins: | |
| top = max(model_wins, key=model_wins.get) | |
| patterns.append(f"{top} wins on {model_wins[top]}/{len(by_benchmark)} benchmarks") | |
| return { | |
| "benchmark_summaries": summaries, | |
| "global_patterns": patterns, | |
| "total_experiments": len(self.experiments), | |
| "benchmarks_covered": list(summaries.keys()), | |
| } | |
| def print_analysis(self, benchmark: str = None): | |
| """Print a human-readable analysis report.""" | |
| a = self.analyze_lineage(benchmark) | |
| print(f"\n{'='*60}") | |
| print(f"SciML Lineage Analysis ({a['total_experiments']} total experiments)") | |
| print(f"{'='*60}") | |
| for bm, s in a["benchmark_summaries"].items(): | |
| print(f"\n{bm}:") | |
| print(f" Experiments : {s['n_experiments']}") | |
| print(f" Best : {s['best_val']:.6f} ({s['best_model']})") | |
| print(f" Trend : {s['trend']}") | |
| if s["hp_importance"]: | |
| top_hp = sorted(s["hp_importance"].items(), key=lambda x: abs(x[1]), reverse=True)[:3] | |
| print(f" HP impact : " + ", ".join(f"{k}={v:+.2f}" for k, v in top_hp)) | |
| if s["model_avg_val"]: | |
| best_m = min(s["model_avg_val"].items(), key=lambda x: x[1]) | |
| print(f" Best model : {best_m[0]} (avg {best_m[1]:.4f})") | |
| if a["global_patterns"]: | |
| print(f"\nGlobal patterns:") | |
| for p in a["global_patterns"]: | |
| print(f" • {p}") | |
| print() | |
| if __name__ == "__main__": | |
| t = Tracker() | |
| print(f"Discovery Engine Initialized with {len(t.experiments)} experiments.") | |
| if t.experiments: | |
| latest = t.experiments[-1] | |
| print(f"Latest Result: {latest['benchmark']} | {latest['model']} | val={latest['val_l2_rel']:.4f}") | |
| t.print_analysis() | |