Spaces:
Paused
Paused
| from __future__ import annotations | |
| from dataclasses import asdict, dataclass | |
| from pathlib import Path | |
| from statistics import fmean | |
| import json | |
| DEFAULT_FRONTIER_MODELS: tuple[str, ...] = ( | |
| "uniform", | |
| "structured_runtime", | |
| "structured_v2", | |
| "structured_v2_copy", | |
| "structured_fast", | |
| "structured_copy", | |
| ) | |
| class ExperimentSpec: | |
| name: str | |
| dataset: str | |
| models: tuple[str, ...] | |
| steps: int | |
| batch_size: int | |
| seq_len: int | |
| eval_every: int | |
| eval_steps: int | |
| train_rows: int = 1000 | |
| val_rows: int = 100 | |
| time_budget_s: float = 0.0 | |
| code_repo: str = "bigcode/the-stack-smol-xs" | |
| code_lang: str = "python" | |
| code_bytes: int = 1_200_000 | |
| seed: int = 42 | |
| def dataset_key(self) -> str: | |
| if self.dataset == "code": | |
| return f"code::{self.code_repo}::{self.code_lang}::{self.code_bytes}" | |
| return f"tinystories::{self.train_rows}::{self.val_rows}" | |
| def budget_key(self) -> str: | |
| if self.time_budget_s > 0.0: | |
| rounded = int(round(self.time_budget_s)) | |
| return f"time::{rounded}s" | |
| return f"steps::{self.steps}" | |
| def slug(self) -> str: | |
| if self.time_budget_s > 0.0: | |
| rounded = int(round(self.time_budget_s)) | |
| suffix = f"time_{rounded}s" | |
| else: | |
| suffix = f"steps_{self.steps}" | |
| return f"{self.name}_{suffix}_seed_{self.seed}" | |
| def as_dict(self) -> dict[str, object]: | |
| return asdict(self) | |
| class SelectionDecision: | |
| spec: ExperimentSpec | |
| pending_models: tuple[str, ...] | |
| score: float | |
| reason: str | |
| def build_frontier_specs( | |
| frontier_models: tuple[str, ...] = DEFAULT_FRONTIER_MODELS, | |
| tinystories_steps: int = 150, | |
| code_steps: int = 100, | |
| time_budget_s: float = 20.0, | |
| batch_size: int = 4, | |
| seq_len: int = 64, | |
| eval_every: int = 25, | |
| eval_steps: int = 5, | |
| train_rows: int = 1000, | |
| val_rows: int = 100, | |
| code_repo: str = "bigcode/the-stack-smol-xs", | |
| code_lang: str = "python", | |
| code_bytes: int = 1_200_000, | |
| seeds: tuple[int, ...] = (42,), | |
| ) -> list[ExperimentSpec]: | |
| specs: list[ExperimentSpec] = [] | |
| for seed in seeds: | |
| specs.extend( | |
| [ | |
| ExperimentSpec( | |
| name="tinystories_equal_step_frontier", | |
| dataset="tinystories", | |
| models=frontier_models, | |
| steps=tinystories_steps, | |
| batch_size=batch_size, | |
| seq_len=seq_len, | |
| eval_every=eval_every, | |
| eval_steps=eval_steps, | |
| train_rows=train_rows, | |
| val_rows=val_rows, | |
| seed=seed, | |
| ), | |
| ExperimentSpec( | |
| name="tinystories_equal_time_frontier", | |
| dataset="tinystories", | |
| models=frontier_models, | |
| steps=9999, | |
| batch_size=batch_size, | |
| seq_len=seq_len, | |
| eval_every=eval_every, | |
| eval_steps=eval_steps, | |
| train_rows=train_rows, | |
| val_rows=val_rows, | |
| time_budget_s=time_budget_s, | |
| seed=seed, | |
| ), | |
| ExperimentSpec( | |
| name="code_equal_step_frontier", | |
| dataset="code", | |
| models=frontier_models, | |
| steps=code_steps, | |
| batch_size=batch_size, | |
| seq_len=seq_len, | |
| eval_every=eval_every, | |
| eval_steps=eval_steps, | |
| code_repo=code_repo, | |
| code_lang=code_lang, | |
| code_bytes=code_bytes, | |
| seed=seed, | |
| ), | |
| ExperimentSpec( | |
| name="code_equal_time_frontier", | |
| dataset="code", | |
| models=frontier_models, | |
| steps=9999, | |
| batch_size=batch_size, | |
| seq_len=seq_len, | |
| eval_every=eval_every, | |
| eval_steps=eval_steps, | |
| time_budget_s=time_budget_s, | |
| code_repo=code_repo, | |
| code_lang=code_lang, | |
| code_bytes=code_bytes, | |
| seed=seed, | |
| ), | |
| ] | |
| ) | |
| return specs | |
| def is_benchmark_result(payload: object) -> bool: | |
| if not isinstance(payload, dict): | |
| return False | |
| if "dataset" not in payload or "runtime" not in payload: | |
| return False | |
| dataset = payload.get("dataset") | |
| runtime = payload.get("runtime") | |
| return isinstance(dataset, dict) and isinstance(runtime, dict) | |
| def load_benchmark_results(results_dir: Path) -> list[dict[str, object]]: | |
| loaded: list[dict[str, object]] = [] | |
| for path in sorted(results_dir.rglob("*.json")): | |
| try: | |
| payload = json.loads(path.read_text(encoding="utf-8")) | |
| except (OSError, json.JSONDecodeError): | |
| continue | |
| if not is_benchmark_result(payload): | |
| continue | |
| item = dict(payload) | |
| item["__path__"] = str(path) | |
| loaded.append(item) | |
| return loaded | |
| def result_dataset_key(result: dict[str, object]) -> str: | |
| dataset = result["dataset"] | |
| assert isinstance(dataset, dict) | |
| dataset_name = str(dataset.get("name", "")) | |
| if dataset_name == "the-stack-bpe": | |
| return ( | |
| f"code::{dataset.get('repo_id')}::{dataset.get('lang')}::" | |
| f"{dataset.get('target_bytes')}" | |
| ) | |
| return f"tinystories::{dataset.get('train_rows')}::{dataset.get('val_rows')}" | |
| def result_budget_key(result: dict[str, object]) -> str: | |
| runtime = result["runtime"] | |
| assert isinstance(runtime, dict) | |
| time_budget_s = float(runtime.get("time_budget_s", 0.0)) | |
| if time_budget_s > 0.0: | |
| rounded = int(round(time_budget_s)) | |
| return f"time::{rounded}s" | |
| return f"steps::{int(runtime.get('steps', 0))}" | |
| def result_models(result: dict[str, object]) -> tuple[str, ...]: | |
| runtime = result["runtime"] | |
| assert isinstance(runtime, dict) | |
| models = runtime.get("models", []) | |
| return tuple(str(model) for model in models) | |
| def result_covers_spec(result: dict[str, object], spec: ExperimentSpec) -> bool: | |
| runtime = result["runtime"] | |
| assert isinstance(runtime, dict) | |
| return ( | |
| result_dataset_key(result) == spec.dataset_key() | |
| and result_budget_key(result) == spec.budget_key() | |
| and int(runtime.get("seed", 42)) == spec.seed | |
| ) | |
| def find_pending_models(spec: ExperimentSpec, results: list[dict[str, object]]) -> tuple[str, ...]: | |
| pending: list[str] = [] | |
| for model in spec.models: | |
| if model == "uniform": | |
| continue | |
| covered = any(result_covers_spec(result, spec) and model in result_models(result) for result in results) | |
| if not covered: | |
| pending.append(model) | |
| return tuple(pending) | |
| def _safe_float(value: object, default: float = 0.0) -> float: | |
| try: | |
| return float(value) | |
| except (TypeError, ValueError): | |
| return default | |
| def model_relative_score(result: dict[str, object], model_name: str) -> float | None: | |
| baseline = result.get("uniform") | |
| model = result.get(model_name) | |
| if not isinstance(baseline, dict) or not isinstance(model, dict): | |
| return None | |
| baseline_best = baseline.get("best_by_val_loss") | |
| model_best = model.get("best_by_val_loss") | |
| baseline_acc = baseline.get("best_by_val_acc") | |
| model_acc = model.get("best_by_val_acc") | |
| baseline_final = baseline.get("final") | |
| model_final = model.get("final") | |
| if not all(isinstance(item, dict) for item in (baseline_best, model_best, baseline_acc, model_acc, baseline_final, model_final)): | |
| return None | |
| loss_gain = _safe_float(baseline_best.get("val_loss")) - _safe_float(model_best.get("val_loss")) | |
| acc_gain = _safe_float(model_acc.get("val_accuracy")) - _safe_float(baseline_acc.get("val_accuracy")) | |
| baseline_speed = _safe_float(baseline_final.get("train_tokens_per_s"), 1.0) | |
| model_speed = _safe_float(model_final.get("train_tokens_per_s"), 0.0) | |
| speed_ratio = model_speed / max(baseline_speed, 1e-8) | |
| baseline_stability = _safe_float(baseline_final.get("val_loss")) - _safe_float(baseline_best.get("val_loss")) | |
| model_stability = _safe_float(model_final.get("val_loss")) - _safe_float(model_best.get("val_loss")) | |
| stability_gain = baseline_stability - model_stability | |
| return 4.0 * loss_gain + 2.0 * acc_gain + 0.5 * (speed_ratio - 1.0) + stability_gain | |
| def aggregate_model_promise( | |
| results: list[dict[str, object]], | |
| frontier_models: tuple[str, ...] = DEFAULT_FRONTIER_MODELS, | |
| ) -> dict[str, float]: | |
| aggregated: dict[str, list[float]] = {model: [] for model in frontier_models if model != "uniform"} | |
| for result in results: | |
| for model in aggregated: | |
| score = model_relative_score(result, model) | |
| if score is not None: | |
| aggregated[model].append(score) | |
| return { | |
| model: fmean(scores) if scores else 0.0 | |
| for model, scores in aggregated.items() | |
| } | |
| def coverage_counts(results: list[dict[str, object]]) -> dict[str, int]: | |
| counts = {"tinystories": 0, "code": 0, "time": 0, "steps": 0} | |
| for result in results: | |
| dataset_key = result_dataset_key(result) | |
| budget_key = result_budget_key(result) | |
| if dataset_key.startswith("code::"): | |
| counts["code"] += 1 | |
| else: | |
| counts["tinystories"] += 1 | |
| if budget_key.startswith("time::"): | |
| counts["time"] += 1 | |
| else: | |
| counts["steps"] += 1 | |
| return counts | |
| def choose_next_experiment( | |
| specs: list[ExperimentSpec], | |
| results: list[dict[str, object]], | |
| ) -> SelectionDecision | None: | |
| decisions = rank_candidate_experiments(specs=specs, results=results) | |
| return decisions[0] if decisions else None | |
| def rank_candidate_experiments( | |
| specs: list[ExperimentSpec], | |
| results: list[dict[str, object]], | |
| ) -> list[SelectionDecision]: | |
| promise = aggregate_model_promise(results) | |
| counts = coverage_counts(results) | |
| recent_names: list[str] = [] | |
| for result in results[-12:]: | |
| runtime = result.get("runtime") | |
| dataset = result.get("dataset") | |
| if not isinstance(runtime, dict) or not isinstance(dataset, dict): | |
| continue | |
| if str(dataset.get("name")) == "the-stack-bpe": | |
| prefix = "code" | |
| else: | |
| prefix = "tinystories" | |
| mode = "time" if float(runtime.get("time_budget_s", 0.0)) > 0.0 else "steps" | |
| recent_names.append(f"{prefix}_{mode}") | |
| decisions: list[SelectionDecision] = [] | |
| for spec in specs: | |
| pending = find_pending_models(spec, results) | |
| if not pending: | |
| continue | |
| recent_penalty = 0.25 if f"{spec.dataset}_{'time' if spec.time_budget_s > 0.0 else 'steps'}" in recent_names[-4:] else 0.0 | |
| dataset_bonus = 0.35 if ( | |
| spec.dataset == "code" and counts["code"] <= counts["tinystories"] | |
| ) or ( | |
| spec.dataset == "tinystories" and counts["tinystories"] < counts["code"] | |
| ) else 0.0 | |
| budget_bonus = 0.2 if ( | |
| spec.time_budget_s > 0.0 and counts["time"] <= counts["steps"] | |
| ) or ( | |
| spec.time_budget_s == 0.0 and counts["steps"] < counts["time"] | |
| ) else 0.0 | |
| promise_bonus = sum(max(0.0, promise.get(model, 0.0)) for model in pending) | |
| score = 3.0 * len(pending) + promise_bonus + dataset_bonus + budget_bonus - recent_penalty | |
| reason = ( | |
| f"pending={', '.join(pending)}; " | |
| f"dataset_bonus={dataset_bonus:.2f}; " | |
| f"budget_bonus={budget_bonus:.2f}; " | |
| f"promise_bonus={promise_bonus:.3f}; " | |
| f"recent_penalty={recent_penalty:.2f}" | |
| ) | |
| decisions.append( | |
| SelectionDecision( | |
| spec=spec, | |
| pending_models=pending, | |
| score=score, | |
| reason=reason, | |
| ) | |
| ) | |
| decisions.sort( | |
| key=lambda decision: ( | |
| decision.score, | |
| len(decision.pending_models), | |
| 1 if decision.spec.dataset == "code" else 0, | |
| 1 if decision.spec.time_budget_s > 0.0 else 0, | |
| -decision.spec.seed, | |
| ), | |
| reverse=True, | |
| ) | |
| return decisions | |
| def build_command( | |
| spec: ExperimentSpec, | |
| runner_path: Path, | |
| output_path: Path, | |
| python_executable: str, | |
| ) -> list[str]: | |
| command = [ | |
| python_executable, | |
| str(runner_path), | |
| "--dataset", | |
| spec.dataset, | |
| "--steps", | |
| str(spec.steps), | |
| "--batch_size", | |
| str(spec.batch_size), | |
| "--seq_len", | |
| str(spec.seq_len), | |
| "--eval_every", | |
| str(spec.eval_every), | |
| "--eval_steps", | |
| str(spec.eval_steps), | |
| "--seed", | |
| str(spec.seed), | |
| "--output", | |
| str(output_path), | |
| "--models", | |
| *spec.models, | |
| ] | |
| if spec.time_budget_s > 0.0: | |
| command.extend(["--time_budget_s", str(spec.time_budget_s)]) | |
| if spec.dataset == "code": | |
| command.extend( | |
| [ | |
| "--code_repo", | |
| spec.code_repo, | |
| "--code_lang", | |
| spec.code_lang, | |
| "--code_bytes", | |
| str(spec.code_bytes), | |
| ] | |
| ) | |
| else: | |
| command.extend( | |
| [ | |
| "--train_rows", | |
| str(spec.train_rows), | |
| "--val_rows", | |
| str(spec.val_rows), | |
| ] | |
| ) | |
| return command | |
| def build_global_leaderboard( | |
| results: list[dict[str, object]], | |
| frontier_models: tuple[str, ...] = DEFAULT_FRONTIER_MODELS, | |
| ) -> list[dict[str, object]]: | |
| rows: list[dict[str, object]] = [] | |
| promise = aggregate_model_promise(results, frontier_models=frontier_models) | |
| for model in frontier_models: | |
| if model == "uniform": | |
| continue | |
| coverage = 0 | |
| exact_hits: list[str] = [] | |
| for result in results: | |
| if model in result_models(result): | |
| coverage += 1 | |
| exact_hits.append(f"{result_dataset_key(result)}|{result_budget_key(result)}") | |
| rows.append( | |
| { | |
| "model": model, | |
| "mean_relative_score": promise.get(model, 0.0), | |
| "coverage_runs": coverage, | |
| "coverage_keys": sorted(set(exact_hits)), | |
| } | |
| ) | |
| rows.sort(key=lambda row: (float(row["mean_relative_score"]), int(row["coverage_runs"])), reverse=True) | |
| return rows | |
| def leaderboard_markdown(rows: list[dict[str, object]]) -> str: | |
| header = [ | |
| "| Model | Mean Relative Score | Coverage Runs | Coverage Keys |", | |
| "|---|---:|---:|---|", | |
| ] | |
| body: list[str] = [] | |
| for row in rows: | |
| coverage_keys = "<br>".join(str(item) for item in row["coverage_keys"]) | |
| body.append( | |
| f"| {row['model']} | {float(row['mean_relative_score']):+.4f} | " | |
| f"{int(row['coverage_runs'])} | {coverage_keys} |" | |
| ) | |
| return "\n".join(header + body) | |