import json import os import uuid import tempfile import threading from datetime import datetime, timezone from flask import Blueprint, request, jsonify bp = Blueprint("experiments", __name__, url_prefix="/api/experiments") DASHBOARD_REPO = "reasoning-degeneration-dev/RESEARCH_DASHBOARD" LOCAL_DATA_DIR = os.path.join(os.path.dirname(os.path.dirname(__file__)), "data") _cache: dict[str, list[dict]] = {} _cache_loaded: set[str] = set() _lock = threading.Lock() FILES = ["experiments", "runs", "sub_experiments", "experiment_notes"] def _ensure_local_dir(): os.makedirs(LOCAL_DATA_DIR, exist_ok=True) def _local_path(name: str) -> str: _ensure_local_dir() return os.path.join(LOCAL_DATA_DIR, f"{name}.json") def _download_file(name: str) -> list[dict]: try: from huggingface_hub import hf_hub_download path = hf_hub_download( DASHBOARD_REPO, f"{name}.json", repo_type="dataset", ) with open(path) as f: data = json.load(f) with open(_local_path(name), "w") as f: json.dump(data, f, indent=2) return data except Exception: local = _local_path(name) if os.path.exists(local): with open(local) as f: return json.load(f) return [] def _upload_file(name: str, data: list[dict]): with open(_local_path(name), "w") as f: json.dump(data, f, indent=2) def _do_upload(): try: from huggingface_hub import HfApi api = HfApi() try: api.create_repo(DASHBOARD_REPO, repo_type="dataset", exist_ok=True) except Exception: pass with tempfile.NamedTemporaryFile("w", suffix=".json", delete=False) as f: json.dump(data, f, indent=2) tmp = f.name api.upload_file( path_or_fileobj=tmp, path_in_repo=f"{name}.json", repo_id=DASHBOARD_REPO, repo_type="dataset", ) os.unlink(tmp) except Exception as e: print(f"[experiments] HF upload failed for {name}: {e}") threading.Thread(target=_do_upload, daemon=True).start() def _get(name: str) -> list[dict]: with _lock: if name not in _cache_loaded: _cache[name] = _download_file(name) _cache_loaded.add(name) return list(_cache.get(name, [])) def _set(name: str, data: list[dict]): with _lock: _cache[name] = data _cache_loaded.add(name) _upload_file(name, data) def _now() -> str: return datetime.now(timezone.utc).isoformat() # --- Experiments CRUD --- @bp.route("/", methods=["GET"]) def list_experiments(): experiments = _get("experiments") runs = _get("runs") subs = _get("sub_experiments") notes = _get("experiment_notes") # Enrich with counts result = [] for exp in experiments: exp_runs = [r for r in runs if r.get("experiment_id") == exp["id"]] exp_subs = [s for s in subs if s.get("experiment_id") == exp["id"]] exp_notes = [n for n in notes if n.get("experiment_id") == exp["id"]] result.append({ **exp, "run_count": len(exp_runs), "sub_count": len(exp_subs), "note_count": len(exp_notes), }) return jsonify(result) @bp.route("/", methods=["POST"]) def create_experiment(): data = request.get_json() name = data.get("name", "").strip() if not name: return jsonify({"error": "name is required"}), 400 exp_id = data.get("id", name.lower().replace(" ", "_")) experiments = _get("experiments") if any(e["id"] == exp_id for e in experiments): return jsonify({"error": f"Experiment '{exp_id}' already exists"}), 409 experiment = { "id": exp_id, "name": name, "research_project": data.get("research_project", ""), "hypothesis": data.get("hypothesis", { "statement": "", "type": "exploration", "status": "pending", "success_criteria": "", }), "stage": data.get("stage", "idea"), "completeness": data.get("completeness", 0), "models": data.get("models", []), "tasks": data.get("tasks", []), "tags": data.get("tags", []), "hf_repos": data.get("hf_repos", []), "wandb_url": data.get("wandb_url", ""), "notes": data.get("notes", ""), "created": _now(), "updated": _now(), } experiments.append(experiment) _set("experiments", experiments) return jsonify(experiment), 201 @bp.route("/", methods=["GET"]) def get_experiment(exp_id): experiments = _get("experiments") exp = next((e for e in experiments if e["id"] == exp_id), None) if not exp: return jsonify({"error": "not found"}), 404 runs = [r for r in _get("runs") if r.get("experiment_id") == exp_id] subs = [s for s in _get("sub_experiments") if s.get("experiment_id") == exp_id] notes = [n for n in _get("experiment_notes") if n.get("experiment_id") == exp_id] return jsonify({**exp, "runs": runs, "sub_experiments": subs, "experiment_notes": notes}) @bp.route("/", methods=["PUT"]) def update_experiment(exp_id): data = request.get_json() experiments = _get("experiments") for exp in experiments: if exp["id"] == exp_id: for key in ["name", "research_project", "hypothesis", "stage", "completeness", "models", "tasks", "tags", "hf_repos", "wandb_url", "notes"]: if key in data: exp[key] = data[key] exp["updated"] = _now() _set("experiments", experiments) return jsonify(exp) return jsonify({"error": "not found"}), 404 @bp.route("/", methods=["DELETE"]) def delete_experiment(exp_id): experiments = _get("experiments") experiments = [e for e in experiments if e["id"] != exp_id] _set("experiments", experiments) # Also delete associated runs, subs, and notes runs = [r for r in _get("runs") if r.get("experiment_id") != exp_id] _set("runs", runs) subs = [s for s in _get("sub_experiments") if s.get("experiment_id") != exp_id] _set("sub_experiments", subs) notes = [n for n in _get("experiment_notes") if n.get("experiment_id") != exp_id] _set("experiment_notes", notes) return jsonify({"status": "ok"}) # --- Run records --- @bp.route("//runs", methods=["POST"]) def create_run(exp_id): experiments = _get("experiments") if not any(e["id"] == exp_id for e in experiments): return jsonify({"error": "experiment not found"}), 404 data = request.get_json() run = { "id": data.get("id", f"run_{uuid.uuid4().hex[:8]}"), "experiment_id": exp_id, "condition": data.get("condition", ""), "model": data.get("model", ""), "cluster": data.get("cluster", ""), "status": data.get("status", "completed"), "hf_dataset": data.get("hf_dataset", ""), "metrics": data.get("metrics", {}), "timestamp": data.get("timestamp", _now()), "notes": data.get("notes", ""), } runs = _get("runs") runs.append(run) _set("runs", runs) # Touch experiment updated timestamp for exp in experiments: if exp["id"] == exp_id: exp["updated"] = _now() _set("experiments", experiments) return jsonify(run), 201 @bp.route("//runs/", methods=["PUT"]) def update_run(exp_id, run_id): data = request.get_json() runs = _get("runs") for run in runs: if run["id"] == run_id and run["experiment_id"] == exp_id: for key in ["condition", "model", "cluster", "status", "hf_dataset", "metrics", "notes"]: if key in data: run[key] = data[key] _set("runs", runs) return jsonify(run) return jsonify({"error": "not found"}), 404 @bp.route("//runs/", methods=["DELETE"]) def delete_run(exp_id, run_id): runs = _get("runs") runs = [r for r in runs if not (r["id"] == run_id and r["experiment_id"] == exp_id)] _set("runs", runs) return jsonify({"status": "ok"}) # --- Sub-experiments --- @bp.route("//subs", methods=["POST"]) def create_sub(exp_id): experiments = _get("experiments") if not any(e["id"] == exp_id for e in experiments): return jsonify({"error": "experiment not found"}), 404 data = request.get_json() name = data.get("name", "").strip() if not name: return jsonify({"error": "name is required"}), 400 sub_id = data.get("id", f"{exp_id}__{name.lower().replace(' ', '_')}") sub = { "id": sub_id, "experiment_id": exp_id, "name": name, "hypothesis": data.get("hypothesis", ""), "status": data.get("status", "active"), "content_md": data.get("content_md", ""), "hf_repos": data.get("hf_repos", []), "created": _now(), "updated": _now(), } subs = _get("sub_experiments") subs.append(sub) _set("sub_experiments", subs) # Touch experiment updated timestamp for exp in experiments: if exp["id"] == exp_id: exp["updated"] = _now() _set("experiments", experiments) return jsonify(sub), 201 @bp.route("//subs/", methods=["PUT"]) def update_sub(exp_id, sub_id): data = request.get_json() subs = _get("sub_experiments") for sub in subs: if sub["id"] == sub_id and sub["experiment_id"] == exp_id: for key in ["name", "hypothesis", "status", "content_md", "hf_repos"]: if key in data: sub[key] = data[key] sub["updated"] = _now() _set("sub_experiments", subs) return jsonify(sub) return jsonify({"error": "not found"}), 404 @bp.route("//subs/", methods=["DELETE"]) def delete_sub(exp_id, sub_id): subs = _get("sub_experiments") subs = [s for s in subs if not (s["id"] == sub_id and s["experiment_id"] == exp_id)] _set("sub_experiments", subs) return jsonify({"status": "ok"}) # --- Experiment Notes --- @bp.route("//notes", methods=["POST"]) def create_note(exp_id): experiments = _get("experiments") if not any(e["id"] == exp_id for e in experiments): return jsonify({"error": "experiment not found"}), 404 data = request.get_json() title = data.get("title", "").strip() if not title: return jsonify({"error": "title is required"}), 400 note_id = data.get("id", f"{exp_id}__note_{uuid.uuid4().hex[:8]}") note = { "id": note_id, "experiment_id": exp_id, "title": title, "filename": data.get("filename", ""), "content_md": data.get("content_md", ""), "created": _now(), "updated": _now(), } notes = _get("experiment_notes") notes.append(note) _set("experiment_notes", notes) return jsonify(note), 201 @bp.route("//notes/", methods=["GET"]) def get_note(exp_id, note_id): notes = _get("experiment_notes") note = next((n for n in notes if n["id"] == note_id and n["experiment_id"] == exp_id), None) if not note: return jsonify({"error": "not found"}), 404 return jsonify(note) @bp.route("//notes/", methods=["PUT"]) def update_note(exp_id, note_id): data = request.get_json() notes = _get("experiment_notes") for note in notes: if note["id"] == note_id and note["experiment_id"] == exp_id: for key in ["title", "content_md"]: if key in data: note[key] = data[key] note["updated"] = _now() _set("experiment_notes", notes) return jsonify(note) return jsonify({"error": "not found"}), 404 @bp.route("//notes/", methods=["DELETE"]) def delete_note(exp_id, note_id): notes = _get("experiment_notes") notes = [n for n in notes if not (n["id"] == note_id and n["experiment_id"] == exp_id)] _set("experiment_notes", notes) return jsonify({"status": "ok"}) # --- Sync & Import --- @bp.route("/sync", methods=["POST"]) def sync(): with _lock: _cache.clear() _cache_loaded.clear() for name in FILES: _get(name) return jsonify({"status": "ok"}) @bp.route("/import", methods=["POST"]) def import_experiments(): """Bulk import from experiment.yaml format (as produced by exp-runner).""" data = request.get_json() items = data if isinstance(data, list) else [data] imported = [] experiments = _get("experiments") runs = _get("runs") subs = _get("sub_experiments") existing_ids = {e["id"] for e in experiments} for item in items: exp_id = item.get("name", "").lower().replace(" ", "_").replace("-", "_") if not exp_id: continue hypothesis = item.get("hypothesis", {}) models = item.get("models", []) model_names = [m.get("id", "") if isinstance(m, dict) else str(m) for m in models] if exp_id not in existing_ids: experiment = { "id": exp_id, "name": item.get("name", exp_id), "research_project": item.get("research_project", ""), "hypothesis": { "statement": hypothesis.get("statement", "") if isinstance(hypothesis, dict) else str(hypothesis), "type": hypothesis.get("type", "exploration") if isinstance(hypothesis, dict) else "exploration", "status": hypothesis.get("status", "pending") if isinstance(hypothesis, dict) else "pending", "success_criteria": hypothesis.get("success_criteria", "") if isinstance(hypothesis, dict) else "", }, "stage": "active", "completeness": 0, "models": model_names, "tasks": [], "tags": item.get("observability", {}).get("tags", []) if isinstance(item.get("observability"), dict) else [], "hf_repos": [], "wandb_url": "", "notes": "", "created": item.get("created", _now()), "updated": _now(), } experiments.append(experiment) existing_ids.add(exp_id) # Import runs for run_data in item.get("runs", []): run_id = run_data.get("run_id", f"run_{uuid.uuid4().hex[:8]}") if any(r["id"] == run_id and r["experiment_id"] == exp_id for r in runs): continue run = { "id": run_id, "experiment_id": exp_id, "condition": run_data.get("condition", ""), "model": run_data.get("model", ""), "cluster": run_data.get("cluster", ""), "status": run_data.get("status", "completed"), "hf_dataset": run_data.get("hf_dataset", ""), "metrics": run_data.get("metrics", {}), "timestamp": run_data.get("timestamp", _now()), "notes": run_data.get("notes", ""), } runs.append(run) # Add HF repo to experiment if present if run.get("hf_dataset"): for exp in experiments: if exp["id"] == exp_id: existing_repos = {r["repo"] for r in exp.get("hf_repos", [])} if run["hf_dataset"] not in existing_repos: exp.setdefault("hf_repos", []).append({ "repo": run["hf_dataset"], "description": f"{run['condition']} - {run['model']}", "date": run["timestamp"][:10] if run["timestamp"] else "", }) imported.append(exp_id) _set("experiments", experiments) _set("runs", runs) _set("sub_experiments", subs) return jsonify({"imported": imported, "count": len(imported)})