| """ |
| FRANKENSTALLM 3B β Full Evaluation Pipeline Orchestrator |
| ========================================================= |
| |
| Runs 4 phases sequentially: |
| Phase 0 β Convert checkpoint to HuggingFace format (convert_to_hf.py) |
| Phase 1 β Internal evaluation across 8 GPUs (subprocess.Popen, isolated) |
| Phase 2 β Standard benchmarks via lm-eval-harness (8 GPU parallel) |
| Phase 3 β Report generation (eval/report_generator.py) |
| |
| Usage: |
| python eval/full_eval_pipeline.py |
| python eval/full_eval_pipeline.py --dry-run |
| python eval/full_eval_pipeline.py --skip-phase0 --skip-phase2 |
| python eval/full_eval_pipeline.py --checkpoint checkpoints/.../checkpoint-NNNNNNN |
| python eval/full_eval_pipeline.py --output-dir eval/outputs/my_run |
| """ |
|
|
| from __future__ import annotations |
|
|
| import argparse |
| import json |
| import logging |
| import multiprocessing as mp |
| import os |
| import subprocess |
| import sys |
| import time |
| import traceback |
| from datetime import datetime |
| from pathlib import Path |
| from typing import Any, Dict, List, Optional, Tuple |
|
|
| |
| |
| |
| _PROJECT_ROOT = Path(__file__).resolve().parent.parent |
| if str(_PROJECT_ROOT) not in sys.path: |
| sys.path.insert(0, str(_PROJECT_ROOT)) |
|
|
| |
| |
| |
| CHECKPOINT = str( |
| _PROJECT_ROOT / "checkpoints" / "korean_3b_fp8_run1" / "checkpoint-0057000" |
| ) |
| TOKENIZER_PATH = str( |
| _PROJECT_ROOT / "tokenizer" / "korean_sp" / "tokenizer.json" |
| ) |
| DATA_DIR = _PROJECT_ROOT / "data" |
| SEQ_LEN = 2048 |
| STRIDE = 512 |
| BATCH_SIZE = 32 |
|
|
| |
| |
| _NUMA_CORES: Dict[int, List[int]] = { |
| 0: list(range(0, 36)), |
| 1: list(range(0, 36)), |
| 2: list(range(0, 36)), |
| 3: list(range(0, 36)), |
| 4: list(range(36, 72)), |
| 5: list(range(36, 72)), |
| 6: list(range(36, 72)), |
| 7: list(range(36, 72)), |
| } |
|
|
| |
| _PHASE1_PPL_FILES: Dict[int, List[str]] = { |
| 0: ["3b_val.bin"], |
| 1: ["korean_c4_val.bin", "korean_val.bin"], |
| 2: ["hplt_ko_val.bin", "cc100_ko_val.bin"], |
| 3: [ |
| "cosmo_auto_math_text_val.bin", |
| "cosmo_stories_val.bin", |
| "cosmo_web_v2_val.bin", |
| "cosmo_stanford_val.bin", |
| "cosmo_khanacademy_val.bin", |
| "cosmo_openstax_val.bin", |
| "cosmo_wikihow_val.bin", |
| ], |
| 4: [ |
| "korean_namuwiki_val.bin", |
| "korean_wiki_val.bin", |
| "namuwiki_2023b_val.bin", |
| "wikipedia_ko_val.bin", |
| "mathpile_val.bin", |
| "open_web_math_val.bin", |
| "val.bin", |
| ], |
| } |
|
|
| |
| _PHASE2_GPU_TASKS: Dict[int, List[str]] = { |
| 0: ["kobest_boolq", "kobest_copa"], |
| 1: ["kobest_hellaswag", "kobest_sentineg"], |
| 2: ["kobest_wic"], |
| 3: ["haerae"], |
| } |
| |
|
|
| |
| |
| |
| logging.basicConfig( |
| level=logging.INFO, |
| format="%(asctime)s [%(levelname)s] %(message)s", |
| datefmt="%Y-%m-%d %H:%M:%S", |
| ) |
| logger = logging.getLogger("full_eval") |
|
|
|
|
| |
| |
| |
|
|
| def set_numa_affinity(gpu_id: int) -> None: |
| """Set CPU affinity of the calling process based on GPU NUMA node. |
| |
| GPU 0-3 β cores 0-35 (NUMA node 0) |
| GPU 4-7 β cores 36-71 (NUMA node 1) |
| """ |
| cores = _NUMA_CORES.get(gpu_id, list(range(72))) |
| try: |
| os.sched_setaffinity(0, cores) |
| except AttributeError: |
| |
| pass |
| except OSError as exc: |
| |
| print(f"[WARN] NUMA affinity set failed for GPU {gpu_id}: {exc}", flush=True) |
|
|
|
|
| |
| |
| |
|
|
| def _isolate_gpu(gpu_id: int) -> None: |
| """Set CUDA_VISIBLE_DEVICES and NUMA affinity for subprocess GPU isolation. |
| |
| After this call, the process only sees one GPU as cuda:0. |
| Used in dry-run display only; actual isolation is done by _spawn_task(). |
| """ |
| os.environ["CUDA_VISIBLE_DEVICES"] = str(gpu_id) |
| set_numa_affinity(gpu_id) |
|
|
|
|
| def _spawn_task( |
| task_name: str, |
| gpu_id: int, |
| output_path: Path, |
| label: str, |
| extra_args: Optional[Dict[str, str]] = None, |
| ) -> Tuple[subprocess.Popen, str, Path, Any]: |
| """Spawn a completely isolated subprocess for a single evaluation task. |
| |
| Each task runs as: |
| CUDA_VISIBLE_DEVICES=<gpu_id> python eval/tasks/task_runner.py |
| --task <task_name> --gpu-id <gpu_id> --output <output_path> [extra_args...] |
| |
| Returns (process, label, output_path, log_file). |
| The caller must close log_file after the process finishes. |
| """ |
| cmd = [ |
| sys.executable, |
| str(_PROJECT_ROOT / "eval" / "tasks" / "task_runner.py"), |
| "--task", task_name, |
| "--gpu-id", str(gpu_id), |
| "--output", str(output_path), |
| ] |
| if extra_args: |
| for k, v in extra_args.items(): |
| cmd.extend([k, v]) |
|
|
| env = os.environ.copy() |
| env["CUDA_VISIBLE_DEVICES"] = str(gpu_id) |
|
|
| output_path.parent.mkdir(parents=True, exist_ok=True) |
| log_path = output_path.with_suffix(".log") |
| log_file = open(log_path, "w") |
|
|
| logger.info(" Spawning: %s (GPU %d)", label, gpu_id) |
| proc = subprocess.Popen( |
| cmd, |
| stdout=log_file, |
| stderr=subprocess.STDOUT, |
| env=env, |
| cwd=str(_PROJECT_ROOT), |
| ) |
| return proc, label, output_path, log_file |
|
|
|
|
| def _wait_and_collect( |
| processes: List[Tuple[subprocess.Popen, str, Path, Any]], |
| max_timeout_sec: float = 3600.0, |
| ) -> Dict[str, Any]: |
| """Poll all spawned processes until completion and collect their JSON results. |
| |
| Each task_runner.py writes its result to output_path as JSON on success. |
| On failure, the error and last 2000 chars of log are captured. |
| Processes still running after *max_timeout_sec* are terminated. |
| """ |
| results: Dict[str, Any] = {} |
| success_count = 0 |
| failure_count = 0 |
| start_time = time.time() |
|
|
| remaining = list(processes) |
| while remaining: |
| still_running = [] |
| for proc, label, out_path, log_file in remaining: |
| ret = proc.poll() |
| if ret is None: |
| still_running.append((proc, label, out_path, log_file)) |
| continue |
|
|
| log_file.close() |
| log_path = out_path.with_suffix(".log") |
|
|
| if ret == 0 and out_path.exists(): |
| try: |
| with open(out_path, "r", encoding="utf-8") as f: |
| result = json.load(f) |
| results[label] = result |
| success_count += 1 |
| logger.info(" [DONE] %s", label) |
| except Exception as exc: |
| results[label] = {"error": f"JSON parse failed: {exc}"} |
| failure_count += 1 |
| logger.error(" [FAILED] %s β JSON parse error: %s", label, exc) |
| else: |
| error_msg = f"Process exited with code {ret}" |
| try: |
| log_text = log_path.read_text(encoding="utf-8", errors="replace")[-2000:] |
| error_msg += f"\n--- Last log output ---\n{log_text}" |
| except Exception: |
| pass |
| results[label] = {"error": error_msg} |
| failure_count += 1 |
| logger.error(" [FAILED] %s β exit code %d", label, ret) |
|
|
| remaining = still_running |
|
|
| |
| if remaining and (time.time() - start_time) > max_timeout_sec: |
| logger.error( |
| " Timeout reached (%.0fs). Terminating %d remaining processes.", |
| max_timeout_sec, len(remaining), |
| ) |
| for proc, label, out_path, log_file in remaining: |
| proc.terminate() |
| log_file.close() |
| results[label] = {"error": f"Timeout after {max_timeout_sec:.0f}s"} |
| failure_count += 1 |
| logger.error(" [TIMEOUT] %s", label) |
| remaining = [] |
| break |
|
|
| if remaining: |
| time.sleep(2) |
|
|
| logger.info(" Complete: %d succeeded, %d failed", success_count, failure_count) |
| return results |
|
|
|
|
| |
| |
| |
|
|
| |
| _PPL_GROUPS = [ |
| (["3b_val.bin"], "PPL: 3b_val.bin"), |
| (["korean_c4_val.bin", "korean_val.bin"], "PPL: korean_c4 + korean_val"), |
| (["hplt_ko_val.bin", "cc100_ko_val.bin"], "PPL: hplt_ko + cc100_ko"), |
| ([ |
| "cosmo_auto_math_text_val.bin", "cosmo_stories_val.bin", |
| "cosmo_web_v2_val.bin", "cosmo_stanford_val.bin", |
| "cosmo_khanacademy_val.bin", "cosmo_openstax_val.bin", |
| "cosmo_wikihow_val.bin", |
| ], "PPL: 7 cosmo files"), |
| ([ |
| "korean_namuwiki_val.bin", "korean_wiki_val.bin", |
| "namuwiki_2023b_val.bin", "wikipedia_ko_val.bin", |
| "mathpile_val.bin", "open_web_math_val.bin", "val.bin", |
| ], "PPL: 7 remaining files"), |
| ] |
|
|
|
|
| def _build_phase1_tasks(gpu_ids: List[int]) -> List[Dict[str, Any]]: |
| """Build Phase 1 task descriptors adapted to available GPUs. |
| |
| Returns a list of dicts with keys: |
| - task : task_runner.py --task value |
| - gpu_id : GPU to assign |
| - label : human-readable description |
| - extra_args: dict of additional CLI flags (--val-file, --val-files, etc.) |
| |
| Strategy: |
| - Reserve last 2-3 GPUs for non-PPL tasks (calib+NLL, generation, repetition) |
| - Distribute PPL groups across remaining GPUs, merging if necessary |
| """ |
| n = len(gpu_ids) |
| tasks: List[Dict[str, Any]] = [] |
|
|
| if n < 3: |
| raise ValueError(f"Need at least 3 GPUs, got {n}: {gpu_ids}") |
|
|
| |
| rep_gpu = gpu_ids[-1] |
| |
| gen_gpu = gpu_ids[-2] |
|
|
| |
| if n >= 4: |
| calib_gpu = gpu_ids[-3] |
| ppl_gpus = gpu_ids[:-3] |
| tasks.append({ |
| "task": "calib_nll", |
| "gpu_id": calib_gpu, |
| "label": f"GPU {calib_gpu} β Calibration + Token NLL", |
| "extra_args": {}, |
| }) |
| tasks.append({ |
| "task": "generation", |
| "gpu_id": gen_gpu, |
| "label": f"GPU {gen_gpu} β Generation (15 prompts Γ 4 temps)", |
| "extra_args": {}, |
| }) |
| else: |
| |
| ppl_gpus = gpu_ids[:-2] |
| tasks.append({ |
| "task": "calib_nll_and_gen", |
| "gpu_id": gen_gpu, |
| "label": f"GPU {gen_gpu} β Calibration + NLL + Generation", |
| "extra_args": {}, |
| }) |
|
|
| tasks.append({ |
| "task": "repetition_grid", |
| "gpu_id": rep_gpu, |
| "label": f"GPU {rep_gpu} β Repetition grid (12 Γ 5)", |
| "extra_args": {}, |
| }) |
|
|
| |
| if len(ppl_gpus) == 0: |
| |
| all_files = [] |
| for files, _ in _PPL_GROUPS: |
| all_files.extend(files) |
| tasks.insert(0, { |
| "task": "ppl_multi", |
| "gpu_id": gpu_ids[0], |
| "label": f"GPU {gpu_ids[0]} β PPL: all {len(all_files)} val files", |
| "extra_args": {"--val-files": ",".join(all_files)}, |
| }) |
| elif len(ppl_gpus) >= len(_PPL_GROUPS): |
| |
| for i, (files, label) in enumerate(_PPL_GROUPS): |
| gpu = ppl_gpus[i] |
| if len(files) == 1: |
| tasks.append({ |
| "task": "ppl_single", |
| "gpu_id": gpu, |
| "label": f"GPU {gpu} β {label}", |
| "extra_args": {"--val-file": files[0]}, |
| }) |
| else: |
| tasks.append({ |
| "task": "ppl_multi", |
| "gpu_id": gpu, |
| "label": f"GPU {gpu} β {label}", |
| "extra_args": {"--val-files": ",".join(files)}, |
| }) |
| else: |
| |
| merged: List[Tuple[List[str], str]] = list(_PPL_GROUPS) |
| while len(merged) > len(ppl_gpus): |
| a_files, a_label = merged.pop() |
| b_files, b_label = merged.pop() |
| merged.append((b_files + a_files, f"{b_label} + {a_label}")) |
| for i, (files, label) in enumerate(merged): |
| gpu = ppl_gpus[i] |
| if len(files) == 1: |
| tasks.append({ |
| "task": "ppl_single", |
| "gpu_id": gpu, |
| "label": f"GPU {gpu} β {label}", |
| "extra_args": {"--val-file": files[0]}, |
| }) |
| else: |
| tasks.append({ |
| "task": "ppl_multi", |
| "gpu_id": gpu, |
| "label": f"GPU {gpu} β {label}", |
| "extra_args": {"--val-files": ",".join(files)}, |
| }) |
|
|
| return tasks |
|
|
|
|
| |
| |
| |
|
|
| def _bar(char: str = "=", width: int = 72) -> str: |
| return char * width |
|
|
|
|
| def _print_banner(title: str) -> None: |
| logger.info(_bar()) |
| logger.info(" %s", title) |
| logger.info(_bar()) |
|
|
|
|
| def _print_phase_header(phase: str, description: str) -> None: |
| logger.info("") |
| logger.info(_bar("-")) |
| logger.info(" %s β %s", phase, description) |
| logger.info(_bar("-")) |
|
|
|
|
| def _fmt_seconds(seconds: float) -> str: |
| m, s = divmod(int(seconds), 60) |
| h, m = divmod(m, 60) |
| if h: |
| return f"{h}h {m}m {s}s" |
| if m: |
| return f"{m}m {s}s" |
| return f"{s}s" |
|
|
|
|
| |
| |
| |
|
|
| _ESTIMATED_TIMES = { |
| "GPU 0 β PPL: 3b_val.bin": "~10 min", |
| "GPU 1 β PPL: korean_c4_val + korean_val": "~15 min", |
| "GPU 2 β PPL: hplt_ko_val + cc100_ko_val": "~15 min", |
| "GPU 3 β PPL: 7 cosmo files": "~25 min", |
| "GPU 4 β PPL: 7 remaining files": "~25 min", |
| "GPU 5 β Calibration + Token NLL": "~20 min", |
| "GPU 6 β Generation (15 prompts Γ 4 temps)": "~20 min", |
| "GPU 7 β Repetition grid (12 settings Γ 5 prompts)": "~15 min", |
| } |
|
|
|
|
| def _dry_run(args: argparse.Namespace, checkpoint: str, output_dir: Path, |
| gpu_ids: Optional[List[int]] = None) -> None: |
| """Validate configuration and print distribution tables without loading models.""" |
| _print_banner("DRY RUN β FRANKENSTALLM 3B Full Eval Pipeline") |
|
|
| |
| logger.info(" Checkpoint : %s", checkpoint) |
| logger.info(" Tokenizer : %s", TOKENIZER_PATH) |
| logger.info(" Data dir : %s", DATA_DIR) |
| logger.info(" Output dir : %s", output_dir) |
| logger.info(" SEQ_LEN : %d", SEQ_LEN) |
| logger.info(" STRIDE : %d", STRIDE) |
| logger.info(" BATCH_SIZE : %d", BATCH_SIZE) |
|
|
| if gpu_ids is None: |
| gpu_ids = list(range(8)) |
|
|
| |
| _print_phase_header("Phase 1", f"Internal Eval β {len(gpu_ids)} GPU Task Distribution") |
| phase1_tasks = _build_phase1_tasks(gpu_ids) |
| col_w = 60 |
| logger.info(" %-6s %-*s %s", "GPU", col_w, "Task", "NUMA") |
| logger.info(" %s %s %s", "-" * 6, "-" * col_w, "-" * 20) |
| for desc in phase1_tasks: |
| gpu_id = desc["gpu_id"] |
| label = desc["label"] |
| numa_node = 0 if gpu_id < 4 else 1 |
| cores = _NUMA_CORES.get(gpu_id, []) |
| core_range = f"cores {cores[0]}-{cores[-1]}" if cores else "?" |
| logger.info(" cuda:%-2d %-*s [NUMA %d, %s]", |
| gpu_id, col_w, label, numa_node, core_range) |
|
|
| |
| _print_phase_header("Phase 1", "Val File Existence Check") |
| all_files: List[str] = [] |
| for files in _PHASE1_PPL_FILES.values(): |
| all_files.extend(files) |
| missing = [] |
| for fname in all_files: |
| fpath = DATA_DIR / fname |
| status = "OK" if fpath.exists() else "MISSING" |
| logger.info(" [%s] %s", status, fpath) |
| if status == "MISSING": |
| missing.append(fname) |
|
|
| if missing: |
| logger.warning(" %d val file(s) missing β those tasks will be skipped at runtime.", len(missing)) |
| else: |
| logger.info(" All %d val files present.", len(all_files)) |
|
|
| |
| _print_phase_header("Phase 0", "Checkpoint Existence Check") |
| ckpt_path = Path(checkpoint) |
| if ckpt_path.exists(): |
| logger.info(" [OK] Checkpoint found: %s", ckpt_path) |
| else: |
| logger.warning(" [MISSING] Checkpoint not found: %s", ckpt_path) |
|
|
| hf_output = output_dir / f"hf_3b_{ckpt_path.name}" |
| logger.info(" HF output will be: %s", hf_output) |
|
|
| |
| _print_phase_header("Phase 2", f"lm-eval Benchmark Distribution (0-shot, {len(gpu_ids)} GPUs)") |
| phase2_tasks = _build_phase2_tasks(gpu_ids) |
| logger.info(" %-6s %-60s", "GPU", "Tasks") |
| logger.info(" %s %s", "-" * 6, "-" * 60) |
| for gpu_id, tasks, label in phase2_tasks: |
| logger.info(" cuda:%-2d %s", gpu_id, label) |
|
|
| |
| _print_phase_header("NUMA Affinity", "GPU β Core Mapping") |
| logger.info(" %-6s %-10s %-12s %s", "GPU", "NUMA node", "Core range", "Cores") |
| logger.info(" %s %s %s %s", "-" * 6, "-" * 10, "-" * 12, "-" * 12) |
| for gpu_id in gpu_ids: |
| cores = _NUMA_CORES[gpu_id] |
| numa = 0 if gpu_id < 4 else 1 |
| logger.info(" cuda:%-2d node %-5d %3d - %-5d (%d cores)", |
| gpu_id, numa, cores[0], cores[-1], len(cores)) |
|
|
| logger.info("") |
| logger.info(" Dry run complete. No models were loaded.") |
| sys.exit(0) |
|
|
|
|
| |
| |
| |
|
|
| def run_phase0(checkpoint: str, output_dir: Path) -> Path: |
| """Convert custom checkpoint to HuggingFace format via subprocess.""" |
| ckpt_name = Path(checkpoint).name |
| hf_output = output_dir / f"hf_3b_{ckpt_name}" |
| hf_output.mkdir(parents=True, exist_ok=True) |
|
|
| convert_script = _PROJECT_ROOT / "scripts" / "convert_to_hf.py" |
| cmd = [ |
| sys.executable, |
| str(convert_script), |
| "--checkpoint", checkpoint, |
| "--output", str(hf_output), |
| "--tokenizer", TOKENIZER_PATH, |
| ] |
| logger.info(" Running: %s", " ".join(cmd)) |
| try: |
| subprocess.run(cmd, check=True) |
| except subprocess.CalledProcessError as exc: |
| raise RuntimeError(f"Phase 0 failed: convert_to_hf.py exited with {exc.returncode}") from exc |
|
|
| logger.info(" HF checkpoint saved to: %s", hf_output) |
| return hf_output |
|
|
|
|
| |
| |
| |
|
|
| def run_phase1(output_dir: Path, gpu_ids: List[int]) -> Dict[str, Any]: |
| """Run internal eval tasks in parallel across the given GPUs. |
| |
| Each task is launched as a completely isolated subprocess via task_runner.py. |
| Results are collected by polling until all processes finish. |
| |
| Returns merged results dict. |
| """ |
| task_descriptors = _build_phase1_tasks(gpu_ids) |
| processes: List[Tuple[subprocess.Popen, str, Path, Any]] = [] |
|
|
| for desc in task_descriptors: |
| out_path = output_dir / f"phase1_{desc['task']}_gpu{desc['gpu_id']}.json" |
| proc_info = _spawn_task( |
| task_name=desc["task"], |
| gpu_id=desc["gpu_id"], |
| output_path=out_path, |
| label=desc["label"], |
| extra_args=desc.get("extra_args"), |
| ) |
| processes.append(proc_info) |
|
|
| results = _wait_and_collect(processes) |
|
|
| |
| phase1_out = output_dir / "phase1_results.json" |
| _save_json(results, phase1_out) |
| logger.info(" Phase 1 results saved: %s", phase1_out) |
|
|
| |
| gen_samples: Dict[str, Any] = {} |
| for label, result in results.items(): |
| if isinstance(result, dict) and "error" not in result: |
| if "Generation" in label: |
| gen_samples["generation"] = result |
| elif "Repetition" in label: |
| gen_samples["repetition_grid"] = result |
| if gen_samples: |
| gen_out = output_dir / "generation_samples.json" |
| _save_json(gen_samples, gen_out) |
| logger.info(" Generation samples saved: %s", gen_out) |
|
|
| return results |
|
|
|
|
| |
| |
| |
|
|
| |
| |
| |
| |
| _BENCHMARK_GROUPS = [ |
| (["kobest_boolq", "kobest_copa", "kobest_wic"], "KoBEST: boolq + copa + wic"), |
| (["kobest_hellaswag", "kobest_sentineg"], "KoBEST: hellaswag + sentineg"), |
| (["haerae"], "HAE-RAE (all subtasks)"), |
| (["global_mmlu_ko"], "MMLU-KO (57 subtasks)"), |
| (["hellaswag", "arc_easy", "arc_challenge"], "EN: hellaswag + arc_easy + arc_challenge"), |
| (["winogrande", "piqa"], "EN: winogrande + piqa"), |
| (["mmlu_humanities", "mmlu_social_sciences"], "MMLU-EN: humanities + social_sciences"), |
| (["mmlu_stem", "mmlu_other"], "MMLU-EN: stem + other"), |
| ] |
|
|
|
|
| def _build_phase2_tasks(gpu_ids: List[int]) -> List[Tuple[int, List[str], str]]: |
| """Distribute lm-eval benchmark tasks across available GPUs.""" |
| n = len(gpu_ids) |
| task_list: List[Tuple[int, List[str], str]] = [] |
|
|
| if n <= 0: |
| return task_list |
|
|
| |
| for i, (tasks, label) in enumerate(_BENCHMARK_GROUPS): |
| gpu_id = gpu_ids[i % n] |
| |
| existing = None |
| for j, (gid, existing_tasks, existing_label) in enumerate(task_list): |
| if gid == gpu_id: |
| existing = j |
| break |
| if existing is not None: |
| gid, existing_tasks, existing_label = task_list[existing] |
| task_list[existing] = (gid, existing_tasks + tasks, |
| f"{existing_label} + {label}") |
| else: |
| task_list.append((gpu_id, tasks, f"GPU {gpu_id} β {label}")) |
|
|
| return task_list |
|
|
|
|
| def _spawn_phase2_batch( |
| hf_model_path: Path, |
| output_dir: Path, |
| gpu_task_list: List[Tuple[int, List[str], str]], |
| num_fewshot: int, |
| label_suffix: str, |
| ) -> Dict[str, Any]: |
| """Spawn all Phase 2 lm_eval subprocesses for one fewshot setting and collect results.""" |
| processes: List[Tuple[subprocess.Popen, str, Path, Any]] = [] |
|
|
| for gpu_id, task_names, label in gpu_task_list: |
| fewshot_label = f"[{num_fewshot}-shot] {label}" |
| out_path = output_dir / f"phase2_gpu{gpu_id}_{num_fewshot}shot{label_suffix}.json" |
| proc_info = _spawn_task( |
| task_name="lm_eval", |
| gpu_id=gpu_id, |
| output_path=out_path, |
| label=fewshot_label, |
| extra_args={ |
| "--hf-model-path": str(hf_model_path), |
| "--lm-eval-tasks": ",".join(task_names), |
| "--num-fewshot": str(num_fewshot), |
| }, |
| ) |
| processes.append(proc_info) |
|
|
| return _wait_and_collect(processes) |
|
|
|
|
| def run_phase2( |
| hf_model_path: Path, |
| output_dir: Path, |
| gpu_ids: Optional[List[int]] = None, |
| num_fewshot: int = 0, |
| ) -> Dict[str, Any]: |
| """Run lm-eval benchmarks across available GPUs in parallel. |
| |
| Each GPU runs its benchmark group as a completely isolated subprocess |
| via task_runner.py. After 0-shot completes, attempts 5-shot (best-effort). |
| """ |
| if gpu_ids is None: |
| gpu_ids = list(range(8)) |
|
|
| gpu_task_list = _build_phase2_tasks(gpu_ids) |
|
|
| logger.info(" Running %d-shot benchmarks on %d GPUs ...", num_fewshot, len(gpu_ids)) |
| results = _spawn_phase2_batch(hf_model_path, output_dir, gpu_task_list, num_fewshot, "") |
|
|
| logger.info(" Phase 2 (%d-shot) complete.", num_fewshot) |
|
|
| |
| if num_fewshot == 0: |
| logger.info(" Attempting 5-shot benchmarks ...") |
| try: |
| five_shot_results = _spawn_phase2_batch( |
| hf_model_path, output_dir, gpu_task_list, 5, "_5shot" |
| ) |
| logger.info(" Phase 2 (5-shot) complete.") |
| except Exception: |
| logger.warning(" 5-shot benchmarks failed (non-fatal): %s", |
| traceback.format_exc()) |
| five_shot_results = {"error": traceback.format_exc()} |
| results["5shot"] = five_shot_results |
|
|
| phase2_out = output_dir / "phase2_results.json" |
| _save_json(results, phase2_out) |
| logger.info(" Phase 2 results saved: %s", phase2_out) |
|
|
| return results |
|
|
|
|
| |
| |
| |
|
|
| def run_phase3( |
| phase1_results: Dict[str, Any], |
| phase2_results: Dict[str, Any], |
| output_dir: Path, |
| total_elapsed_sec: float = 0.0, |
| ) -> Optional[Path]: |
| """Generate markdown report from all collected results.""" |
| report_path = output_dir / "full_eval_report.md" |
| try: |
| from eval.report_generator import generate_report |
|
|
| |
| gen_samples = [] |
| gen_label = "GPU 6 β Generation (15 prompts Γ 4 temps)" |
| if gen_label in phase1_results and isinstance(phase1_results[gen_label], dict): |
| gen_data = phase1_results[gen_label] |
| if "samples" in gen_data: |
| gen_samples = gen_data["samples"] |
|
|
| generate_report( |
| phase1_results=phase1_results, |
| phase2_results=phase2_results, |
| generation_samples=gen_samples, |
| output_dir=report_path.parent, |
| checkpoint_name=Path(CHECKPOINT).name, |
| total_elapsed_sec=total_elapsed_sec, |
| ) |
| logger.info(" Report saved: %s", report_path) |
| return report_path |
| except ImportError: |
| logger.warning( |
| " eval.report_generator not found β generating minimal fallback report." |
| ) |
| _write_fallback_report(phase1_results, phase2_results, report_path) |
| return report_path |
| except Exception: |
| logger.error(" Phase 3 report generation failed:\n%s", traceback.format_exc()) |
| return None |
|
|
|
|
| def _write_fallback_report( |
| phase1_results: Dict[str, Any], |
| phase2_results: Dict[str, Any], |
| report_path: Path, |
| ) -> None: |
| """Write a simple markdown report when report_generator is unavailable.""" |
| lines: List[str] = [ |
| "# FRANKENSTALLM 3B β Full Evaluation Report", |
| "", |
| f"Generated: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}", |
| "", |
| "## Phase 1 Results", |
| "", |
| ] |
| for label, result in phase1_results.items(): |
| lines.append(f"### {label}") |
| if isinstance(result, dict) and "error" in result: |
| lines.append(f"**FAILED**: {result['error'][:200]}") |
| else: |
| lines.append(f"```json\n{json.dumps(result, indent=2, ensure_ascii=False, default=str)[:2000]}\n```") |
| lines.append("") |
|
|
| lines += [ |
| "## Phase 2 Results", |
| "", |
| ] |
| for label, result in phase2_results.items(): |
| lines.append(f"### {label}") |
| if isinstance(result, dict) and "error" in result: |
| lines.append(f"**FAILED**: {result['error'][:200]}") |
| else: |
| lines.append(f"```json\n{json.dumps(result, indent=2, ensure_ascii=False, default=str)[:2000]}\n```") |
| lines.append("") |
|
|
| report_path.write_text("\n".join(lines), encoding="utf-8") |
|
|
|
|
| |
| |
| |
|
|
| def _save_json(data: Any, path: Path) -> None: |
| """Save data as JSON, converting non-serialisable objects to strings.""" |
| path.parent.mkdir(parents=True, exist_ok=True) |
| with open(path, "w", encoding="utf-8") as f: |
| json.dump(data, f, indent=2, ensure_ascii=False, default=str) |
|
|
|
|
| def _make_output_dir(output_dir_override: Optional[str]) -> Path: |
| if output_dir_override: |
| out = Path(output_dir_override) |
| else: |
| timestamp = datetime.now().strftime("%Y%m%d_%H%M") |
| out = _PROJECT_ROOT / "eval" / "outputs" / f"3b_full_eval_{timestamp}" |
| out.mkdir(parents=True, exist_ok=True) |
| return out |
|
|
|
|
| |
| |
| |
|
|
| def parse_args() -> argparse.Namespace: |
| parser = argparse.ArgumentParser( |
| description="FRANKENSTALLM 3B β Full Evaluation Pipeline Orchestrator", |
| formatter_class=argparse.RawDescriptionHelpFormatter, |
| ) |
| parser.add_argument( |
| "--dry-run", |
| action="store_true", |
| help="Validate task distribution without loading models, then exit.", |
| ) |
| parser.add_argument( |
| "--skip-phase0", |
| action="store_true", |
| help="Skip HF conversion (reuse existing checkpoint in outputs/).", |
| ) |
| parser.add_argument( |
| "--skip-phase1", |
| action="store_true", |
| help="Skip internal 8-GPU evaluation.", |
| ) |
| parser.add_argument( |
| "--skip-phase2", |
| action="store_true", |
| help="Skip lm-eval-harness benchmarks.", |
| ) |
| parser.add_argument( |
| "--checkpoint", |
| type=str, |
| default=None, |
| help=f"Override checkpoint path (default: {CHECKPOINT})", |
| ) |
| parser.add_argument( |
| "--output-dir", |
| type=str, |
| default=None, |
| help="Override output directory (default: eval/outputs/3b_full_eval_YYYYMMDD_HHMM/)", |
| ) |
| parser.add_argument( |
| "--gpus", |
| type=str, |
| default=None, |
| help="Comma-separated GPU IDs to use, e.g. '2,3,4,5,6,7'. Default: all 8 GPUs (0-7).", |
| ) |
| return parser.parse_args() |
|
|
|
|
| |
| |
| |
|
|
| def main() -> None: |
| |
| try: |
| mp.set_start_method("spawn", force=True) |
| except RuntimeError: |
| pass |
|
|
| args = parse_args() |
|
|
| |
| checkpoint = args.checkpoint if args.checkpoint else CHECKPOINT |
|
|
| |
| output_dir = _make_output_dir(args.output_dir) |
|
|
| |
| if args.gpus: |
| gpu_ids = sorted([int(g.strip()) for g in args.gpus.split(",")]) |
| else: |
| gpu_ids = list(range(8)) |
|
|
| |
| if args.dry_run: |
| _dry_run(args, checkpoint, output_dir, gpu_ids) |
| return |
|
|
| |
| |
| |
| _print_banner("FRANKENSTALLM 3B β Full Evaluation Pipeline") |
| logger.info(" Checkpoint : %s", checkpoint) |
| logger.info(" Tokenizer : %s", TOKENIZER_PATH) |
| logger.info(" Data dir : %s", DATA_DIR) |
| logger.info(" Output dir : %s", output_dir) |
| logger.info(" GPUs : %s", gpu_ids) |
| logger.info(" SEQ_LEN : %d STRIDE: %d BATCH_SIZE: %d", |
| SEQ_LEN, STRIDE, BATCH_SIZE) |
| logger.info(" Phases : phase0=%s phase1=%s phase2=%s", |
| "skip" if args.skip_phase0 else "run", |
| "skip" if args.skip_phase1 else "run", |
| "skip" if args.skip_phase2 else "run") |
|
|
| pipeline_start = time.time() |
| phase1_results: Dict[str, Any] = {} |
| phase2_results: Dict[str, Any] = {} |
| hf_model_path: Optional[Path] = None |
|
|
| |
| |
| |
| _print_phase_header("PHASE 0", "HF Checkpoint Conversion") |
| if args.skip_phase0: |
| |
| ckpt_name = Path(checkpoint).name |
| candidate = output_dir / f"hf_3b_{ckpt_name}" |
| if candidate.exists(): |
| hf_model_path = candidate |
| logger.info(" Skipping Phase 0 β reusing: %s", hf_model_path) |
| else: |
| |
| candidates = list(output_dir.parent.glob(f"**/hf_3b_{ckpt_name}")) |
| if candidates: |
| hf_model_path = candidates[0] |
| logger.info(" Skipping Phase 0 β reusing found: %s", hf_model_path) |
| else: |
| logger.warning( |
| " --skip-phase0 set but no HF checkpoint found for %s. " |
| "Phase 2 will be skipped unless you specify --skip-phase2 " |
| "or set --output-dir to a directory containing the HF checkpoint.", |
| ckpt_name, |
| ) |
| else: |
| t0 = time.time() |
| try: |
| hf_model_path = run_phase0(checkpoint, output_dir) |
| logger.info(" Phase 0 complete in %s.", _fmt_seconds(time.time() - t0)) |
| except Exception: |
| logger.error(" Phase 0 FAILED:\n%s", traceback.format_exc()) |
| logger.warning(" Continuing without HF conversion β Phase 2 will be skipped.") |
|
|
| |
| |
| |
| _print_phase_header("PHASE 1", f"Internal Evaluation β {len(gpu_ids)} GPU Parallel") |
| if args.skip_phase1: |
| logger.info(" Skipping Phase 1.") |
| |
| phase1_out = output_dir / "phase1_results.json" |
| if phase1_out.exists(): |
| with open(phase1_out, encoding="utf-8") as f: |
| phase1_results = json.load(f) |
| logger.info(" Loaded existing Phase 1 results from: %s", phase1_out) |
| else: |
| t0 = time.time() |
| try: |
| phase1_results = run_phase1(output_dir, gpu_ids) |
| logger.info(" Phase 1 complete in %s.", _fmt_seconds(time.time() - t0)) |
| except Exception: |
| logger.error(" Phase 1 FAILED:\n%s", traceback.format_exc()) |
|
|
| |
| |
| |
| _print_phase_header("PHASE 2", f"lm-eval Benchmarks β {len(gpu_ids)} GPU Parallel") |
| if args.skip_phase2: |
| logger.info(" Skipping Phase 2.") |
| phase2_out = output_dir / "phase2_results.json" |
| if phase2_out.exists(): |
| with open(phase2_out, encoding="utf-8") as f: |
| phase2_results = json.load(f) |
| logger.info(" Loaded existing Phase 2 results from: %s", phase2_out) |
| elif hf_model_path is None: |
| logger.warning(" Phase 2 skipped β HF model path unavailable (Phase 0 failed or skipped).") |
| else: |
| t0 = time.time() |
| try: |
| phase2_results = run_phase2(hf_model_path, output_dir, gpu_ids=gpu_ids, |
| num_fewshot=0) |
| logger.info(" Phase 2 complete in %s.", _fmt_seconds(time.time() - t0)) |
| except Exception: |
| logger.error(" Phase 2 FAILED:\n%s", traceback.format_exc()) |
|
|
| |
| |
| |
| _print_phase_header("PHASE 3", "Report Generation") |
| t0 = time.time() |
| report_path = run_phase3(phase1_results, phase2_results, output_dir, |
| total_elapsed_sec=time.time() - pipeline_start) |
| logger.info(" Phase 3 complete in %s.", _fmt_seconds(time.time() - t0)) |
|
|
| |
| |
| |
| total_elapsed = time.time() - pipeline_start |
| _print_banner("PIPELINE COMPLETE") |
| logger.info(" Total time : %s", _fmt_seconds(total_elapsed)) |
| logger.info(" Output dir : %s", output_dir) |
| logger.info(" Phase 1 results : %s", output_dir / "phase1_results.json") |
| logger.info(" Phase 2 results : %s", output_dir / "phase2_results.json") |
| logger.info(" Gen samples : %s", output_dir / "generation_samples.json") |
| logger.info(" Report : %s", report_path or "N/A (generation failed)") |
|
|
| |
| if phase1_results: |
| p1_ok = sum(1 for v in phase1_results.values() |
| if not (isinstance(v, dict) and "error" in v)) |
| p1_fail = len(phase1_results) - p1_ok |
| logger.info(" Phase 1 tasks : %d OK / %d failed", p1_ok, p1_fail) |
|
|
| |
| if phase2_results: |
| p2_entries = {k: v for k, v in phase2_results.items() if k != "5shot"} |
| p2_ok = sum(1 for v in p2_entries.values() |
| if not (isinstance(v, dict) and "error" in v)) |
| p2_fail = len(p2_entries) - p2_ok |
| logger.info(" Phase 2 tasks : %d OK / %d failed", p2_ok, p2_fail) |
|
|
| logger.info(_bar()) |
|
|
|
|
| if __name__ == "__main__": |
| main() |
|
|