Spaces:
Sleeping
Sleeping
| """Autonomous architecture experiment framework. | |
| Inspired by Karpathy's autoresearch pattern: | |
| modify config -> train -> evaluate -> keep/discard -> repeat. | |
| """ | |
| import logging | |
| import os | |
| from datetime import datetime | |
| from pathlib import Path | |
| from typing import Callable, Optional | |
| import pandas as pd | |
| logger = logging.getLogger(__name__) | |
| class AutoresearchLoop: | |
| """Run, track, and compare architecture experiments automatically. | |
| Workflow per experiment: | |
| 1. Apply config modification | |
| 2. Train with modified config | |
| 3. Evaluate | |
| 4. Compare to baseline | |
| 5. Keep if improvement > threshold, discard otherwise | |
| 6. Log result to results.tsv | |
| """ | |
| def __init__( | |
| self, | |
| results_path: str = "results.tsv", | |
| improvement_threshold: float = 0.01, | |
| primary_metric: str = "sharpe_ratio", | |
| ): | |
| self.results_path = results_path | |
| self.improvement_threshold = improvement_threshold | |
| self.primary_metric = primary_metric | |
| self.baseline_metrics: Optional[dict] = None | |
| def set_baseline(self, metrics: dict) -> None: | |
| """Set current best metrics as baseline for comparison. | |
| Args: | |
| metrics: dict with at least the primary_metric key. | |
| """ | |
| if self.primary_metric not in metrics: | |
| raise ValueError( | |
| f"Baseline must include primary metric '{self.primary_metric}'" | |
| ) | |
| self.baseline_metrics = dict(metrics) | |
| logger.info( | |
| f"Baseline set: {self.primary_metric}={metrics[self.primary_metric]:.4f}" | |
| ) | |
| def run_experiment( | |
| self, | |
| name: str, | |
| config_modifier: Callable[[dict], dict], | |
| train_fn: Callable[[dict], object], | |
| evaluate_fn: Callable[[object], dict], | |
| base_config: Optional[dict] = None, | |
| ) -> dict: | |
| """Run a single experiment. | |
| Args: | |
| name: Experiment name for logging. | |
| config_modifier: Takes base config dict, returns modified config. | |
| train_fn: Takes config dict, returns trained model/artifact. | |
| evaluate_fn: Takes trained artifact, returns metrics dict. | |
| base_config: Starting config (empty dict if None). | |
| Returns: | |
| {name, metrics, kept, improvement} | |
| """ | |
| if base_config is None: | |
| base_config = {} | |
| config = config_modifier(dict(base_config)) | |
| logger.info(f"Experiment '{name}': training...") | |
| artifact = train_fn(config) | |
| metrics = evaluate_fn(artifact) | |
| kept = False | |
| improvement = 0.0 | |
| if self.baseline_metrics is not None: | |
| baseline_val = self.baseline_metrics.get(self.primary_metric, 0.0) | |
| current_val = metrics.get(self.primary_metric, 0.0) | |
| if baseline_val != 0: | |
| improvement = (current_val - baseline_val) / abs(baseline_val) | |
| elif current_val > 0: | |
| improvement = 1.0 | |
| if improvement >= self.improvement_threshold: | |
| kept = True | |
| self.baseline_metrics = dict(metrics) | |
| logger.info( | |
| f"Experiment '{name}': KEPT " | |
| f"(improvement={improvement:+.4f}, " | |
| f"{self.primary_metric}={current_val:.4f})" | |
| ) | |
| else: | |
| logger.info( | |
| f"Experiment '{name}': DISCARDED " | |
| f"(improvement={improvement:+.4f} < threshold={self.improvement_threshold})" | |
| ) | |
| else: | |
| # No baseline — first experiment is always kept | |
| kept = True | |
| self.baseline_metrics = dict(metrics) | |
| logger.info(f"Experiment '{name}': KEPT (first experiment, set as baseline)") | |
| result = { | |
| "name": name, | |
| "metrics": metrics, | |
| "kept": kept, | |
| "improvement": improvement, | |
| } | |
| self._log_result(result) | |
| return result | |
| def run_experiment_queue( | |
| self, | |
| experiments: list[dict], | |
| base_config: Optional[dict] = None, | |
| ) -> list[dict]: | |
| """Run a queue of experiments sequentially. | |
| Each dict should have keys: name, config_modifier, train_fn, evaluate_fn. | |
| Returns: | |
| List of result dicts. | |
| """ | |
| results = [] | |
| for exp in experiments: | |
| result = self.run_experiment( | |
| name=exp["name"], | |
| config_modifier=exp["config_modifier"], | |
| train_fn=exp["train_fn"], | |
| evaluate_fn=exp["evaluate_fn"], | |
| base_config=base_config, | |
| ) | |
| results.append(result) | |
| return results | |
| def _log_result(self, result: dict) -> None: | |
| """Append experiment result to TSV file.""" | |
| path = Path(self.results_path) | |
| file_exists = path.exists() and path.stat().st_size > 0 | |
| flat = { | |
| "timestamp": datetime.now().isoformat(timespec="seconds"), | |
| "name": result["name"], | |
| "kept": result["kept"], | |
| "improvement": f"{result['improvement']:.6f}", | |
| } | |
| for k, v in result["metrics"].items(): | |
| flat[f"metric_{k}"] = f"{v:.6f}" if isinstance(v, float) else str(v) | |
| row = pd.DataFrame([flat]) | |
| row.to_csv( | |
| self.results_path, | |
| sep="\t", | |
| mode="a", | |
| header=not file_exists, | |
| index=False, | |
| ) | |
| def load_results(self) -> pd.DataFrame: | |
| """Load experiment results from TSV.""" | |
| path = Path(self.results_path) | |
| if not path.exists(): | |
| return pd.DataFrame() | |
| return pd.read_csv(self.results_path, sep="\t") | |