| """ |
| lm_eval_task.py — lm-evaluation-harness integration task. |
| |
| Top-level function for ProcessPoolExecutor (spawn) compatibility: |
| - run_lm_eval_tasks(hf_model_path, tasks, device, num_fewshot=0) -> dict |
| |
| Requires: lm_eval >= 0.4 (installed as lm-eval 0.4.11) |
| """ |
| from __future__ import annotations |
|
|
| import logging |
| import os |
| import sys |
| import time |
| from pathlib import Path |
| from typing import Any |
|
|
| _PROJECT_ROOT = Path(__file__).resolve().parent.parent.parent |
| if str(_PROJECT_ROOT) not in sys.path: |
| sys.path.insert(0, str(_PROJECT_ROOT)) |
|
|
| CHECKPOINT = str(_PROJECT_ROOT / "checkpoints" / "korean_3b_fp8_run1" / "checkpoint-0057000") |
| TOKENIZER_PATH = str(_PROJECT_ROOT / "tokenizer" / "korean_sp" / "tokenizer.json") |
| DATA_DIR = _PROJECT_ROOT / "data" |
| SEQ_LEN = 2048 |
| STRIDE = 512 |
| BATCH_SIZE = 32 |
|
|
| logger = logging.getLogger(__name__) |
|
|
|
|
| |
| |
| |
|
|
| def run_lm_eval_tasks( |
| hf_model_path: str, |
| tasks: list[str], |
| device: str, |
| num_fewshot: int = 0, |
| ) -> dict: |
| """Run lm-evaluation-harness benchmarks on a HuggingFace-format model. |
| |
| Isolates a single GPU via CUDA_VISIBLE_DEVICES so the function is safe |
| to run in a ProcessPoolExecutor worker without VRAM conflicts. |
| |
| Args: |
| hf_model_path: Path to a HuggingFace-compatible model directory |
| (must contain config.json + safetensors/pytorch_model). |
| tasks: List of lm-eval task names, e.g. |
| ["hellaswag", "arc_easy", "piqa"]. |
| Unknown tasks are skipped with a warning. |
| device: CUDA device string, e.g. "cuda:7". |
| The function maps this to CUDA_VISIBLE_DEVICES=7 and |
| then uses device="cuda:0" inside lm_eval. |
| num_fewshot: Number of few-shot examples (0 = zero-shot). |
| |
| Returns: |
| Dict with keys: |
| - model_path: hf_model_path as provided |
| - tasks_requested: original task list |
| - tasks_evaluated: tasks that were actually run |
| - tasks_skipped: tasks that were not available / errored |
| - per_task_metrics: dict mapping task name to metric sub-dict |
| - raw_results: full results dict from lm_eval.simple_evaluate |
| - elapsed_sec: wall-clock time for the evaluation |
| """ |
| |
| gpu_index = int(device.split(":")[-1]) |
| os.environ["CUDA_VISIBLE_DEVICES"] = str(gpu_index) |
| |
| _internal_device = "cuda:0" |
|
|
| print( |
| f"[LM_EVAL] Starting on {device} " |
| f"(CUDA_VISIBLE_DEVICES={gpu_index}), tasks={tasks}, " |
| f"num_fewshot={num_fewshot}" |
| ) |
|
|
| |
| try: |
| import lm_eval |
| from lm_eval.tasks import TaskManager |
|
|
| task_manager = TaskManager() |
| available_tasks: set[str] = set(task_manager.all_tasks) |
| except Exception as exc: |
| logger.warning(f"[LM_EVAL] Could not enumerate available tasks: {exc}") |
| available_tasks = set() |
|
|
| valid_tasks: list[str] = [] |
| skipped_tasks: list[str] = [] |
|
|
| for t in tasks: |
| if (not available_tasks) or (t in available_tasks): |
| valid_tasks.append(t) |
| else: |
| logger.warning(f"[LM_EVAL] Task '{t}' not found in lm_eval registry — skipping.") |
| skipped_tasks.append(t) |
|
|
| if not valid_tasks: |
| print("[LM_EVAL] No valid tasks to evaluate.") |
| return { |
| "model_path": hf_model_path, |
| "tasks_requested": tasks, |
| "tasks_evaluated": [], |
| "tasks_skipped": skipped_tasks, |
| "per_task_metrics": {}, |
| "raw_results": {}, |
| "elapsed_sec": 0.0, |
| } |
|
|
| |
| t0 = time.time() |
| raw_results: dict[str, Any] = {} |
| evaluated_tasks: list[str] = [] |
| error_tasks: list[str] = [] |
|
|
| |
| try: |
| print( |
| f"[LM_EVAL] Evaluating {len(valid_tasks)} task(s) together: {valid_tasks}" |
| ) |
| raw_results = lm_eval.simple_evaluate( |
| model="hf", |
| model_args=( |
| f"pretrained={hf_model_path}," |
| f"dtype=bfloat16," |
| f"device={_internal_device}" |
| ), |
| tasks=valid_tasks, |
| num_fewshot=num_fewshot, |
| batch_size="auto", |
| ) |
| evaluated_tasks = list(valid_tasks) |
|
|
| except Exception as exc: |
| logger.warning( |
| f"[LM_EVAL] Batch evaluation failed ({exc}). " |
| "Falling back to per-task evaluation." |
| ) |
| |
| for task_name in valid_tasks: |
| try: |
| print(f"[LM_EVAL] Evaluating task '{task_name}' individually...") |
| task_result = lm_eval.simple_evaluate( |
| model="hf", |
| model_args=( |
| f"pretrained={hf_model_path}," |
| f"dtype=bfloat16," |
| f"device={_internal_device}" |
| ), |
| tasks=[task_name], |
| num_fewshot=num_fewshot, |
| batch_size="auto", |
| device=_internal_device, |
| ) |
| |
| if not raw_results: |
| raw_results = task_result |
| else: |
| if "results" in task_result and "results" in raw_results: |
| raw_results["results"].update(task_result.get("results", {})) |
| evaluated_tasks.append(task_name) |
| except Exception as task_exc: |
| logger.warning( |
| f"[LM_EVAL] Task '{task_name}' failed: {task_exc}" |
| ) |
| error_tasks.append(task_name) |
|
|
| skipped_tasks.extend(error_tasks) |
| elapsed = time.time() - t0 |
|
|
| |
| |
| |
| |
| per_task_metrics: dict[str, dict] = {} |
| lm_results: dict[str, Any] = raw_results.get("results", {}) |
|
|
| for task_name, task_data in lm_results.items(): |
| if not isinstance(task_data, dict): |
| continue |
| metrics: dict[str, Any] = {} |
| for key, value in task_data.items(): |
| |
| if key in ("alias", "group"): |
| continue |
| metrics[key] = value |
| per_task_metrics[task_name] = metrics |
|
|
| |
| for task_name in evaluated_tasks: |
| if task_name not in per_task_metrics: |
| logger.warning( |
| f"[LM_EVAL] Task '{task_name}' not found in results dict after evaluation." |
| ) |
|
|
| |
| print(f"[LM_EVAL] Evaluation complete in {elapsed:.1f}s") |
| for task_name, metrics in per_task_metrics.items(): |
| |
| acc = metrics.get("acc,none") or metrics.get("acc") or metrics.get("accuracy") |
| acc_norm = metrics.get("acc_norm,none") or metrics.get("acc_norm") |
| if acc is not None: |
| line = f" {task_name}: acc={acc:.4f}" |
| if acc_norm is not None: |
| line += f", acc_norm={acc_norm:.4f}" |
| print(f"[LM_EVAL] {line}") |
| else: |
| print(f"[LM_EVAL] {task_name}: {metrics}") |
|
|
| if skipped_tasks: |
| print(f"[LM_EVAL] Skipped tasks: {skipped_tasks}") |
|
|
| return { |
| "model_path": hf_model_path, |
| "tasks_requested": tasks, |
| "tasks_evaluated": evaluated_tasks, |
| "tasks_skipped": skipped_tasks, |
| "per_task_metrics": per_task_metrics, |
| "raw_results": raw_results, |
| "elapsed_sec": round(elapsed, 1), |
| } |
|
|
|
|
| |
| |
| |
|
|
| def _extract_per_task_metrics(raw_results: dict) -> dict[str, dict]: |
| """Extract per-task metrics from lm_eval raw results.""" |
| per_task_metrics: dict[str, dict] = {} |
| lm_results: dict[str, Any] = raw_results.get("results", {}) |
| for task_name, task_data in lm_results.items(): |
| if not isinstance(task_data, dict): |
| continue |
| metrics = {k: v for k, v in task_data.items() if k not in ("alias", "group")} |
| per_task_metrics[task_name] = metrics |
| return per_task_metrics |
|
|
|
|
| def run_lm_eval_tasks_pipeline( |
| hf_model_path: str, |
| tasks: list[str], |
| device: str, |
| fewshot_values: list[int], |
| output_dir: str = "", |
| output_prefix: str = "", |
| ) -> dict: |
| """Run lm-eval with multiple fewshot settings, loading the model ONCE. |
| |
| This avoids the overhead of loading the model N times when running |
| 0-shot then 5-shot on the same GPU. |
| |
| Returns: |
| Dict with keys like "0shot", "5shot", each containing the same |
| structure as run_lm_eval_tasks(). |
| """ |
| import json as _json |
|
|
| import lm_eval |
| from lm_eval.models.huggingface import HFLM |
|
|
| |
| gpu_index = int(device.split(":")[-1]) |
| os.environ["CUDA_VISIBLE_DEVICES"] = str(gpu_index) |
| _internal_device = "cuda:0" |
|
|
| print( |
| f"[LM_EVAL_PIPELINE] Loading model once on {device} " |
| f"for fewshot={fewshot_values}, tasks={tasks}", |
| flush=True, |
| ) |
|
|
| |
| model_obj = HFLM( |
| pretrained=hf_model_path, |
| dtype="bfloat16", |
| device=_internal_device, |
| batch_size="auto", |
| ) |
|
|
| |
| try: |
| from lm_eval.tasks import TaskManager |
| available_tasks = set(TaskManager().all_tasks) |
| except Exception: |
| available_tasks = set() |
|
|
| valid_tasks = [t for t in tasks if (not available_tasks) or (t in available_tasks)] |
| skipped_tasks = [t for t in tasks if t not in valid_tasks] |
|
|
| if not valid_tasks: |
| print("[LM_EVAL_PIPELINE] No valid tasks.", flush=True) |
| empty = { |
| "model_path": hf_model_path, |
| "tasks_requested": tasks, |
| "tasks_evaluated": [], |
| "tasks_skipped": skipped_tasks, |
| "per_task_metrics": {}, |
| "raw_results": {}, |
| "elapsed_sec": 0.0, |
| } |
| return {f"{n}shot": empty for n in fewshot_values} |
|
|
| |
| all_results: dict[str, Any] = {} |
|
|
| for num_fewshot in fewshot_values: |
| print( |
| f"[LM_EVAL_PIPELINE] Running {num_fewshot}-shot on {valid_tasks}...", |
| flush=True, |
| ) |
| t0 = time.time() |
|
|
| try: |
| raw_results = lm_eval.simple_evaluate( |
| model=model_obj, |
| tasks=valid_tasks, |
| num_fewshot=num_fewshot, |
| ) |
| per_task_metrics = _extract_per_task_metrics(raw_results) |
| elapsed = time.time() - t0 |
|
|
| shot_result = { |
| "model_path": hf_model_path, |
| "tasks_requested": tasks, |
| "tasks_evaluated": list(valid_tasks), |
| "tasks_skipped": list(skipped_tasks), |
| "per_task_metrics": per_task_metrics, |
| "raw_results": raw_results, |
| "elapsed_sec": round(elapsed, 1), |
| } |
| print( |
| f"[LM_EVAL_PIPELINE] {num_fewshot}-shot complete in {elapsed:.1f}s", |
| flush=True, |
| ) |
|
|
| except Exception as exc: |
| elapsed = time.time() - t0 |
| shot_result = { |
| "model_path": hf_model_path, |
| "tasks_requested": tasks, |
| "tasks_evaluated": [], |
| "tasks_skipped": list(tasks), |
| "per_task_metrics": {}, |
| "raw_results": {}, |
| "elapsed_sec": round(elapsed, 1), |
| "error": str(exc), |
| } |
| print( |
| f"[LM_EVAL_PIPELINE] {num_fewshot}-shot FAILED: {exc}", |
| flush=True, |
| ) |
|
|
| all_results[f"{num_fewshot}shot"] = shot_result |
|
|
| |
| if output_dir: |
| shot_path = Path(output_dir) / f"{output_prefix}_{num_fewshot}shot.json" |
| with open(shot_path, "w", encoding="utf-8") as f: |
| _json.dump(shot_result, f, ensure_ascii=False, indent=2, default=str) |
|
|
| return all_results |
|
|