"""Import all experiment data from local files into the Research Dashboard HF repo.""" import json import os import re import tempfile import uuid import yaml from pathlib import Path from huggingface_hub import HfApi EXPERIMENTS_DIR = Path("/Users/rs2020/Research/notes/experiments") DASHBOARD_REPO = "reasoning-degeneration-dev/RESEARCH_DASHBOARD" # Experiments to exclude from dashboard import. All others are auto-discovered. EXCLUDED_EXPERIMENTS: set[str] = set() STAGE_MAP = { "supported": "concluded", "invalidated": "concluded", "inconclusive": "inconclusive", "exploring": "active", "active": "active", "pending": "planned", } def compute_completeness(exp_dir: Path, config: dict) -> int: score = 0 if (exp_dir / "questions.md").exists(): score += 1 if (exp_dir / "EXPERIMENT_README.md").exists(): score += 1 if (exp_dir / "HUGGINGFACE_REPOS.md").exists(): score += 1 if (exp_dir / "experiment.yaml").exists(): score += 1 sub_dir = exp_dir / "experiments" if sub_dir.exists() and any(sub_dir.glob("*.md")): score += 1 return score def parse_hf_repos(content: str) -> list[dict]: """Extract HF repo links from HUGGINGFACE_REPOS.md markdown tables.""" repos = [] seen = set() # Match markdown links like [name](https://huggingface.co/datasets/org/repo) link_pattern = re.compile(r'\[([^\]]*)\]\(https://huggingface\.co/datasets/([^)]+)\)') for match in link_pattern.finditer(content): name, repo = match.groups() if repo not in seen: seen.add(repo) repos.append({"repo": repo, "description": name.strip(), "date": ""}) # Also match plain repo references like reasoning-degeneration-dev/something plain_pattern = re.compile(r'(?:^|\s)(reasoning-degeneration-dev/[\w-]+)') for match in plain_pattern.finditer(content): repo = match.group(1).strip() if repo not in seen: seen.add(repo) repos.append({"repo": repo, "description": "", "date": ""}) return repos def load_experiment(exp_dir: Path) -> tuple[dict, list[dict], list[dict], list[dict], list[dict]]: """Load a single experiment directory. Returns (experiment, runs, sub_experiments, experiment_notes, activity_log).""" name = exp_dir.name # Load config config = {} config_path = exp_dir / "experiment.yaml" if config_path.exists(): with open(config_path) as f: config = yaml.safe_load(f) or {} # Hypothesis hyp_raw = config.get("hypothesis", {}) if isinstance(hyp_raw, str): hyp_raw = {"statement": hyp_raw} hypothesis = { "statement": hyp_raw.get("statement", ""), "type": hyp_raw.get("type", "exploration"), "status": hyp_raw.get("status", "pending"), "success_criteria": hyp_raw.get("success_criteria", ""), } # Stage from hypothesis status stage = STAGE_MAP.get(hypothesis["status"], "active") if not (exp_dir / "EXPERIMENT_README.md").exists() and not config: stage = "idea" # Models models_raw = config.get("models", []) models = [] for m in models_raw: if isinstance(m, dict): mid = m.get("id", "") # Clean up provider prefix for display short = mid.split("/")[-1] if "/" in mid else mid if short and short not in models: models.append(short) elif isinstance(m, str) and m not in models: models.append(m) # Tasks tasks = [] eval_cfg = config.get("evaluation", {}) if isinstance(eval_cfg, dict): task = eval_cfg.get("task", "") if task: tasks.append(task) extra_tasks = eval_cfg.get("extra", {}).get("additional_tasks", []) tasks.extend(extra_tasks) # Tags obs = config.get("observability", {}) tags = obs.get("tags", []) if isinstance(obs, dict) else [] # Notes from EXPERIMENT_README.md notes = "" readme_path = exp_dir / "EXPERIMENT_README.md" if readme_path.exists(): with open(readme_path) as f: notes = f.read() # HF repos hf_repos = [] hf_path = exp_dir / "HUGGINGFACE_REPOS.md" if hf_path.exists(): with open(hf_path) as f: hf_repos = parse_hf_repos(f.read()) # Wandb wandb_project = obs.get("wandb_project", "") if isinstance(obs, dict) else "" wandb_url = f"https://wandb.ai/{wandb_project}" if wandb_project else "" # Completeness completeness = compute_completeness(exp_dir, config) # Zayne's custom files (zaynes/ folder) def _load_zayne_file(filename: str) -> str: p = exp_dir / "zaynes" / filename if p.exists(): with open(p) as f: content = f.read().strip() if content and not content.startswith("