| """ |
| FRANKENSTALLM 3B β SFT Evaluation Pipeline Orchestrator |
| ========================================================= |
| |
| Evaluates the SFT checkpoint across 6 dimensions and generates a |
| comparison report against the Base model results. |
| |
| Runs 4 phases sequentially: |
| Phase 0 β Convert SFT checkpoint to HuggingFace format |
| Phase 1 β Internal evaluation across 8 GPUs (PPL, Calibration, Generation) |
| Phase 2 β Standard benchmarks via lm-eval-harness (8 GPU parallel) |
| Phase 3 β Base vs SFT comparison report generation |
| |
| Usage: |
| python eval/sft_eval_pipeline.py |
| python eval/sft_eval_pipeline.py --dry-run |
| python eval/sft_eval_pipeline.py --skip-phase0 --skip-phase2 |
| python eval/sft_eval_pipeline.py --skip-phase0 --hf-model-path eval/outputs/hf_3b_sft_best |
| """ |
|
|
| from __future__ import annotations |
|
|
| import argparse |
| import json |
| import logging |
| import multiprocessing as mp |
| import os |
| import sys |
| import time |
| import traceback |
| from datetime import datetime |
| from pathlib import Path |
| from typing import Any, Dict, List, Optional |
|
|
| |
| |
| |
| _PROJECT_ROOT = Path(__file__).resolve().parent.parent |
| if str(_PROJECT_ROOT) not in sys.path: |
| sys.path.insert(0, str(_PROJECT_ROOT)) |
|
|
| |
| |
| |
| SFT_CHECKPOINT = str( |
| _PROJECT_ROOT / "checkpoints" / "korean_3b_sft_v2" / "checkpoint-best" |
| ) |
| |
| SFT_TOKENIZER = str( |
| _PROJECT_ROOT / "checkpoints" / "korean_3b_sft_v2" / "tokenizer.json" |
| ) |
| |
| _FALLBACK_TOKENIZER = str( |
| _PROJECT_ROOT / "tokenizer" / "korean_sp" / "tokenizer.json" |
| ) |
| BASE_RESULTS_DIR = _PROJECT_ROOT / "eval" / "outputs" / "3b_reeval_20260305_1451" |
|
|
| |
| |
| |
| from eval.full_eval_pipeline import ( |
| _bar, |
| _build_phase1_tasks, |
| _build_phase2_tasks, |
| _fmt_seconds, |
| _make_output_dir, |
| _NUMA_CORES, |
| _print_banner, |
| _print_phase_header, |
| _save_json, |
| _spawn_task, |
| _wait_and_collect, |
| run_phase0, |
| SEQ_LEN, |
| STRIDE, |
| BATCH_SIZE, |
| DATA_DIR, |
| ) |
|
|
| |
| |
| |
| logging.basicConfig( |
| level=logging.INFO, |
| format="%(asctime)s [%(levelname)s] %(message)s", |
| datefmt="%Y-%m-%d %H:%M:%S", |
| ) |
| logger = logging.getLogger("sft_eval") |
|
|
|
|
| |
| |
| |
|
|
| def _spawn_sft_task( |
| task_name: str, |
| gpu_id: int, |
| output_path: Path, |
| label: str, |
| checkpoint: str, |
| tokenizer: str, |
| use_chat_template: bool = False, |
| extra_args: Optional[Dict[str, str]] = None, |
| ) -> tuple: |
| """Spawn a subprocess task with SFT checkpoint via environment variables.""" |
| cmd = [ |
| sys.executable, |
| str(_PROJECT_ROOT / "eval" / "tasks" / "task_runner.py"), |
| "--task", task_name, |
| "--gpu-id", str(gpu_id), |
| "--output", str(output_path), |
| ] |
| if extra_args: |
| for k, v in extra_args.items(): |
| cmd.extend([k, v]) |
|
|
| env = os.environ.copy() |
| env["CUDA_VISIBLE_DEVICES"] = str(gpu_id) |
| env["EVAL_CHECKPOINT"] = checkpoint |
| env["EVAL_TOKENIZER"] = tokenizer |
| if use_chat_template: |
| env["USE_CHAT_TEMPLATE"] = "1" |
|
|
| import subprocess |
| output_path.parent.mkdir(parents=True, exist_ok=True) |
| log_path = output_path.with_suffix(".log") |
| log_file = open(log_path, "w") |
|
|
| logger.info(" Spawning: %s (GPU %d) [SFT]", label, gpu_id) |
| proc = subprocess.Popen( |
| cmd, |
| stdout=log_file, |
| stderr=subprocess.STDOUT, |
| env=env, |
| cwd=str(_PROJECT_ROOT), |
| ) |
| return proc, label, output_path, log_file |
|
|
|
|
| |
| |
| |
|
|
| def run_sft_phase1( |
| output_dir: Path, |
| gpu_ids: List[int], |
| checkpoint: str, |
| tokenizer: str, |
| ) -> Dict[str, Any]: |
| """Run internal eval tasks with SFT checkpoint, chat template enabled for gen tasks.""" |
| task_descriptors = _build_phase1_tasks(gpu_ids) |
| processes = [] |
|
|
| for desc in task_descriptors: |
| is_gen_task = desc["task"] in ("generation", "repetition_grid") |
| out_path = output_dir / f"phase1_{desc['task']}_gpu{desc['gpu_id']}.json" |
| proc_info = _spawn_sft_task( |
| task_name=desc["task"], |
| gpu_id=desc["gpu_id"], |
| output_path=out_path, |
| label=desc["label"], |
| checkpoint=checkpoint, |
| tokenizer=tokenizer, |
| use_chat_template=is_gen_task, |
| extra_args=desc.get("extra_args"), |
| ) |
| processes.append(proc_info) |
|
|
| results = _wait_and_collect(processes) |
|
|
| phase1_out = output_dir / "phase1_results.json" |
| _save_json(results, phase1_out) |
| logger.info(" Phase 1 results saved: %s", phase1_out) |
|
|
| |
| gen_samples: Dict[str, Any] = {} |
| for label, result in results.items(): |
| if isinstance(result, dict) and "error" not in result: |
| if "Generation" in label: |
| gen_samples["generation"] = result |
| elif "Repetition" in label: |
| gen_samples["repetition_grid"] = result |
| if gen_samples: |
| gen_out = output_dir / "generation_samples.json" |
| _save_json(gen_samples, gen_out) |
| logger.info(" Generation samples saved: %s", gen_out) |
|
|
| return results |
|
|
|
|
| |
| |
| |
|
|
| def _spawn_sft_phase2_batch( |
| hf_model_path: Path, |
| output_dir: Path, |
| gpu_task_list: list, |
| num_fewshot: int, |
| label_suffix: str, |
| checkpoint: str, |
| tokenizer: str, |
| ) -> Dict[str, Any]: |
| """Spawn Phase 2 subprocesses with SFT environment.""" |
| processes = [] |
|
|
| for gpu_id, task_names, label in gpu_task_list: |
| fewshot_label = f"[{num_fewshot}-shot] {label}" |
| out_path = output_dir / f"phase2_gpu{gpu_id}_{num_fewshot}shot{label_suffix}.json" |
| proc_info = _spawn_sft_task( |
| task_name="lm_eval", |
| gpu_id=gpu_id, |
| output_path=out_path, |
| label=fewshot_label, |
| checkpoint=checkpoint, |
| tokenizer=tokenizer, |
| extra_args={ |
| "--hf-model-path": str(hf_model_path), |
| "--lm-eval-tasks": ",".join(task_names), |
| "--num-fewshot": str(num_fewshot), |
| }, |
| ) |
| processes.append(proc_info) |
|
|
| return _wait_and_collect(processes) |
|
|
|
|
| def run_sft_phase2( |
| hf_model_path: Path, |
| output_dir: Path, |
| gpu_ids: List[int], |
| checkpoint: str, |
| tokenizer: str, |
| ) -> Dict[str, Any]: |
| """Run lm-eval benchmarks for SFT model (0-shot + 5-shot).""" |
| gpu_task_list = _build_phase2_tasks(gpu_ids) |
|
|
| logger.info(" Running 0-shot benchmarks on %d GPUs ...", len(gpu_ids)) |
| results = _spawn_sft_phase2_batch( |
| hf_model_path, output_dir, gpu_task_list, 0, "", |
| checkpoint, tokenizer, |
| ) |
| logger.info(" Phase 2 (0-shot) complete.") |
|
|
| |
| logger.info(" Attempting 5-shot benchmarks ...") |
| try: |
| five_shot_results = _spawn_sft_phase2_batch( |
| hf_model_path, output_dir, gpu_task_list, 5, "_5shot", |
| checkpoint, tokenizer, |
| ) |
| logger.info(" Phase 2 (5-shot) complete.") |
| except Exception: |
| logger.warning(" 5-shot failed (non-fatal): %s", traceback.format_exc()) |
| five_shot_results = {"error": traceback.format_exc()} |
| results["5shot"] = five_shot_results |
|
|
| phase2_out = output_dir / "phase2_results.json" |
| _save_json(results, phase2_out) |
| logger.info(" Phase 2 results saved: %s", phase2_out) |
| return results |
|
|
|
|
| |
| |
| |
|
|
| def run_sft_phase3( |
| phase1_results: Dict[str, Any], |
| phase2_results: Dict[str, Any], |
| output_dir: Path, |
| base_results_dir: Path, |
| total_elapsed_sec: float, |
| ) -> Optional[Path]: |
| """Generate Base vs SFT comparison report.""" |
| try: |
| from eval.report_generator import generate_comparison_report |
|
|
| report_path = generate_comparison_report( |
| base_results_dir=base_results_dir, |
| sft_phase1_results=phase1_results, |
| sft_phase2_results=phase2_results, |
| output_path=_PROJECT_ROOT / "reports" / f"{datetime.now().strftime('%Y-%m-%d')}_3B_SFT_V2_EVALUATION_REPORT.md", |
| sft_output_dir=output_dir, |
| total_elapsed_sec=total_elapsed_sec, |
| ) |
| logger.info(" Comparison report saved: %s", report_path) |
| return report_path |
| except Exception: |
| logger.error(" Phase 3 report generation failed:\n%s", traceback.format_exc()) |
|
|
| |
| fallback = output_dir / "sft_eval_summary.json" |
| _save_json({ |
| "phase1": phase1_results, |
| "phase2": phase2_results, |
| }, fallback) |
| logger.info(" Fallback summary saved: %s", fallback) |
| return None |
|
|
|
|
| |
| |
| |
|
|
| def parse_args() -> argparse.Namespace: |
| parser = argparse.ArgumentParser( |
| description="FRANKENSTALLM 3B β SFT Evaluation Pipeline", |
| formatter_class=argparse.RawDescriptionHelpFormatter, |
| ) |
| parser.add_argument("--dry-run", action="store_true") |
| parser.add_argument("--skip-phase0", action="store_true", |
| help="Skip HF conversion (reuse existing).") |
| parser.add_argument("--skip-phase1", action="store_true", |
| help="Skip internal eval.") |
| parser.add_argument("--skip-phase2", action="store_true", |
| help="Skip lm-eval benchmarks.") |
| parser.add_argument("--checkpoint", type=str, default=None, |
| help=f"Override SFT checkpoint (default: {SFT_CHECKPOINT})") |
| parser.add_argument("--hf-model-path", type=str, default=None, |
| help="Pre-converted HF model path (skips Phase 0).") |
| parser.add_argument("--output-dir", type=str, default=None, |
| help="Override output directory.") |
| parser.add_argument("--base-results", type=str, default=None, |
| help=f"Base eval results dir (default: {BASE_RESULTS_DIR})") |
| parser.add_argument("--gpus", type=str, default=None, |
| help="Comma-separated GPU IDs (default: 0-7).") |
| return parser.parse_args() |
|
|
|
|
| |
| |
| |
|
|
| def main() -> None: |
| try: |
| mp.set_start_method("spawn", force=True) |
| except RuntimeError: |
| pass |
|
|
| args = parse_args() |
|
|
| |
| checkpoint = args.checkpoint or SFT_CHECKPOINT |
| tokenizer = SFT_TOKENIZER if Path(SFT_TOKENIZER).exists() else _FALLBACK_TOKENIZER |
| base_results_dir = Path(args.base_results) if args.base_results else BASE_RESULTS_DIR |
|
|
| |
| if args.output_dir: |
| output_dir = Path(args.output_dir) |
| else: |
| timestamp = datetime.now().strftime("%Y%m%d_%H%M") |
| output_dir = _PROJECT_ROOT / "eval" / "outputs" / f"3b_sft_eval_{timestamp}" |
| output_dir.mkdir(parents=True, exist_ok=True) |
|
|
| |
| gpu_ids = sorted([int(g.strip()) for g in args.gpus.split(",")]) if args.gpus else list(range(8)) |
|
|
| |
| if args.dry_run: |
| _print_banner("DRY RUN β SFT Eval Pipeline") |
| logger.info(" SFT Checkpoint : %s", checkpoint) |
| logger.info(" Tokenizer : %s", tokenizer) |
| logger.info(" Base Results : %s", base_results_dir) |
| logger.info(" Output dir : %s", output_dir) |
| logger.info(" GPUs : %s", gpu_ids) |
| logger.info(" Chat template : ENABLED for generation tasks") |
| logger.info("") |
|
|
| phase1_tasks = _build_phase1_tasks(gpu_ids) |
| logger.info(" Phase 1 Tasks (%d):", len(phase1_tasks)) |
| for desc in phase1_tasks: |
| is_gen = desc["task"] in ("generation", "repetition_grid") |
| chat_mark = " [CHAT]" if is_gen else "" |
| logger.info(" GPU %d β %s%s", desc["gpu_id"], desc["label"], chat_mark) |
|
|
| phase2_tasks = _build_phase2_tasks(gpu_ids) |
| logger.info(" Phase 2 Tasks (%d):", len(phase2_tasks)) |
| for gpu_id, tasks, label in phase2_tasks: |
| logger.info(" GPU %d β %s", gpu_id, label) |
|
|
| |
| if base_results_dir.exists(): |
| p1_file = base_results_dir / "phase1_results.json" |
| p2_file = base_results_dir / "phase2_results.json" |
| logger.info(" Base phase1_results.json: %s", "OK" if p1_file.exists() else "MISSING") |
| logger.info(" Base phase2_results.json: %s", "OK" if p2_file.exists() else "MISSING") |
| else: |
| logger.warning(" Base results dir NOT FOUND: %s", base_results_dir) |
|
|
| sys.exit(0) |
|
|
| |
| |
| |
| _print_banner("FRANKENSTALLM 3B β SFT Evaluation Pipeline") |
| logger.info(" SFT Checkpoint : %s", checkpoint) |
| logger.info(" Tokenizer : %s", tokenizer) |
| logger.info(" Base Results : %s", base_results_dir) |
| logger.info(" Output dir : %s", output_dir) |
| logger.info(" GPUs : %s", gpu_ids) |
| logger.info(" Phases : phase0=%s phase1=%s phase2=%s", |
| "skip" if args.skip_phase0 else "run", |
| "skip" if args.skip_phase1 else "run", |
| "skip" if args.skip_phase2 else "run") |
|
|
| |
| if not Path(checkpoint).exists(): |
| logger.error("SFT checkpoint not found: %s", checkpoint) |
| sys.exit(1) |
| if not Path(tokenizer).exists(): |
| logger.error("Tokenizer not found: %s", tokenizer) |
| sys.exit(1) |
| logger.info(" Preflight OK: checkpoint=%s, tokenizer=%s", checkpoint, tokenizer) |
|
|
| pipeline_start = time.time() |
| phase1_results: Dict[str, Any] = {} |
| phase2_results: Dict[str, Any] = {} |
| hf_model_path: Optional[Path] = None |
|
|
| |
| |
| |
| _print_phase_header("PHASE 0", "SFT Checkpoint β HuggingFace Conversion") |
| if args.hf_model_path: |
| hf_model_path = Path(args.hf_model_path) |
| logger.info(" Using pre-converted HF model: %s", hf_model_path) |
| elif args.skip_phase0: |
| |
| candidate = output_dir / "hf_3b_sft_best" |
| outputs_dir = _PROJECT_ROOT / "eval" / "outputs" |
| if candidate.exists(): |
| hf_model_path = candidate |
| else: |
| candidates = list(outputs_dir.glob("hf_3b_sft*")) |
| if candidates: |
| hf_model_path = candidates[0] |
| if hf_model_path: |
| logger.info(" Skipping Phase 0 β reusing: %s", hf_model_path) |
| else: |
| logger.warning(" No HF model found. Phase 2 will be skipped.") |
| else: |
| t0 = time.time() |
| try: |
| hf_output = output_dir / "hf_3b_sft_best" |
| hf_output.mkdir(parents=True, exist_ok=True) |
|
|
| import subprocess |
| convert_script = _PROJECT_ROOT / "scripts" / "convert_to_hf.py" |
| cmd = [ |
| sys.executable, str(convert_script), |
| "--checkpoint", checkpoint, |
| "--output", str(hf_output), |
| "--tokenizer", tokenizer, |
| ] |
| logger.info(" Running: %s", " ".join(cmd)) |
| subprocess.run(cmd, check=True, cwd=str(_PROJECT_ROOT)) |
| hf_model_path = hf_output |
| logger.info(" Phase 0 complete in %s.", _fmt_seconds(time.time() - t0)) |
| except Exception: |
| logger.error(" Phase 0 FAILED:\n%s", traceback.format_exc()) |
|
|
| |
| |
| |
| _print_phase_header("PHASE 1", f"SFT Internal Evaluation β {len(gpu_ids)} GPU Parallel") |
| if args.skip_phase1: |
| logger.info(" Skipping Phase 1.") |
| phase1_out = output_dir / "phase1_results.json" |
| if phase1_out.exists(): |
| with open(phase1_out, encoding="utf-8") as f: |
| phase1_results = json.load(f) |
| logger.info(" Loaded existing Phase 1 results.") |
| else: |
| t0 = time.time() |
| try: |
| phase1_results = run_sft_phase1(output_dir, gpu_ids, checkpoint, tokenizer) |
| logger.info(" Phase 1 complete in %s.", _fmt_seconds(time.time() - t0)) |
| except Exception: |
| logger.error(" Phase 1 FAILED:\n%s", traceback.format_exc()) |
|
|
| |
| |
| |
| _print_phase_header("PHASE 2", f"SFT Benchmarks β {len(gpu_ids)} GPU Parallel") |
| if args.skip_phase2: |
| logger.info(" Skipping Phase 2.") |
| phase2_out = output_dir / "phase2_results.json" |
| if phase2_out.exists(): |
| with open(phase2_out, encoding="utf-8") as f: |
| phase2_results = json.load(f) |
| logger.info(" Loaded existing Phase 2 results.") |
| elif hf_model_path is None: |
| logger.warning(" Phase 2 skipped β HF model unavailable.") |
| else: |
| t0 = time.time() |
| try: |
| phase2_results = run_sft_phase2( |
| hf_model_path, output_dir, gpu_ids, checkpoint, tokenizer, |
| ) |
| logger.info(" Phase 2 complete in %s.", _fmt_seconds(time.time() - t0)) |
| except Exception: |
| logger.error(" Phase 2 FAILED:\n%s", traceback.format_exc()) |
|
|
| |
| |
| |
| _print_phase_header("PHASE 3", "Base vs SFT Comparison Report") |
| t0 = time.time() |
| report_path = run_sft_phase3( |
| phase1_results, phase2_results, output_dir, base_results_dir, |
| total_elapsed_sec=time.time() - pipeline_start, |
| ) |
| logger.info(" Phase 3 complete in %s.", _fmt_seconds(time.time() - t0)) |
|
|
| |
| |
| |
| total_elapsed = time.time() - pipeline_start |
| _print_banner("SFT EVALUATION PIPELINE COMPLETE") |
| logger.info(" Total time : %s", _fmt_seconds(total_elapsed)) |
| logger.info(" Output dir : %s", output_dir) |
| logger.info(" Phase 1 results : %s", output_dir / "phase1_results.json") |
| logger.info(" Phase 2 results : %s", output_dir / "phase2_results.json") |
| logger.info(" Report : %s", report_path or "N/A") |
| logger.info(_bar()) |
|
|
|
|
| if __name__ == "__main__": |
| main() |
|
|