frankenstallm / source /eval /tasks /lm_eval_task.py
pathcosmos's picture
Upload folder using huggingface_hub (#29)
5b1ff4d
"""
lm_eval_task.py — lm-evaluation-harness integration task.
Top-level function for ProcessPoolExecutor (spawn) compatibility:
- run_lm_eval_tasks(hf_model_path, tasks, device, num_fewshot=0) -> dict
Requires: lm_eval >= 0.4 (installed as lm-eval 0.4.11)
"""
from __future__ import annotations
import logging
import os
import sys
import time
from pathlib import Path
from typing import Any
_PROJECT_ROOT = Path(__file__).resolve().parent.parent.parent
if str(_PROJECT_ROOT) not in sys.path:
sys.path.insert(0, str(_PROJECT_ROOT))
CHECKPOINT = str(_PROJECT_ROOT / "checkpoints" / "korean_3b_fp8_run1" / "checkpoint-0057000")
TOKENIZER_PATH = str(_PROJECT_ROOT / "tokenizer" / "korean_sp" / "tokenizer.json")
DATA_DIR = _PROJECT_ROOT / "data"
SEQ_LEN = 2048
STRIDE = 512
BATCH_SIZE = 32
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Main task function (must be top-level for pickle / spawn compatibility)
# ---------------------------------------------------------------------------
def run_lm_eval_tasks(
hf_model_path: str,
tasks: list[str],
device: str,
num_fewshot: int = 0,
) -> dict:
"""Run lm-evaluation-harness benchmarks on a HuggingFace-format model.
Isolates a single GPU via CUDA_VISIBLE_DEVICES so the function is safe
to run in a ProcessPoolExecutor worker without VRAM conflicts.
Args:
hf_model_path: Path to a HuggingFace-compatible model directory
(must contain config.json + safetensors/pytorch_model).
tasks: List of lm-eval task names, e.g.
["hellaswag", "arc_easy", "piqa"].
Unknown tasks are skipped with a warning.
device: CUDA device string, e.g. "cuda:7".
The function maps this to CUDA_VISIBLE_DEVICES=7 and
then uses device="cuda:0" inside lm_eval.
num_fewshot: Number of few-shot examples (0 = zero-shot).
Returns:
Dict with keys:
- model_path: hf_model_path as provided
- tasks_requested: original task list
- tasks_evaluated: tasks that were actually run
- tasks_skipped: tasks that were not available / errored
- per_task_metrics: dict mapping task name to metric sub-dict
- raw_results: full results dict from lm_eval.simple_evaluate
- elapsed_sec: wall-clock time for the evaluation
"""
# --- GPU isolation ---
gpu_index = int(device.split(":")[-1])
os.environ["CUDA_VISIBLE_DEVICES"] = str(gpu_index)
# After this point use cuda:0 since only one GPU is visible
_internal_device = "cuda:0"
print(
f"[LM_EVAL] Starting on {device} "
f"(CUDA_VISIBLE_DEVICES={gpu_index}), tasks={tasks}, "
f"num_fewshot={num_fewshot}"
)
# --- Validate task list ---
try:
import lm_eval # type: ignore[import]
from lm_eval.tasks import TaskManager # type: ignore[import]
task_manager = TaskManager()
available_tasks: set[str] = set(task_manager.all_tasks)
except Exception as exc:
logger.warning(f"[LM_EVAL] Could not enumerate available tasks: {exc}")
available_tasks = set() # will attempt all and catch errors per task
valid_tasks: list[str] = []
skipped_tasks: list[str] = []
for t in tasks:
if (not available_tasks) or (t in available_tasks):
valid_tasks.append(t)
else:
logger.warning(f"[LM_EVAL] Task '{t}' not found in lm_eval registry — skipping.")
skipped_tasks.append(t)
if not valid_tasks:
print("[LM_EVAL] No valid tasks to evaluate.")
return {
"model_path": hf_model_path,
"tasks_requested": tasks,
"tasks_evaluated": [],
"tasks_skipped": skipped_tasks,
"per_task_metrics": {},
"raw_results": {},
"elapsed_sec": 0.0,
}
# --- Run evaluation ---
t0 = time.time()
raw_results: dict[str, Any] = {}
evaluated_tasks: list[str] = []
error_tasks: list[str] = []
# Attempt all valid tasks together first; fall back to per-task on error
try:
print(
f"[LM_EVAL] Evaluating {len(valid_tasks)} task(s) together: {valid_tasks}"
)
raw_results = lm_eval.simple_evaluate(
model="hf",
model_args=(
f"pretrained={hf_model_path},"
f"dtype=bfloat16,"
f"device={_internal_device}"
),
tasks=valid_tasks,
num_fewshot=num_fewshot,
batch_size="auto",
)
evaluated_tasks = list(valid_tasks)
except Exception as exc:
logger.warning(
f"[LM_EVAL] Batch evaluation failed ({exc}). "
"Falling back to per-task evaluation."
)
# Fall back: evaluate one task at a time
for task_name in valid_tasks:
try:
print(f"[LM_EVAL] Evaluating task '{task_name}' individually...")
task_result = lm_eval.simple_evaluate(
model="hf",
model_args=(
f"pretrained={hf_model_path},"
f"dtype=bfloat16,"
f"device={_internal_device}"
),
tasks=[task_name],
num_fewshot=num_fewshot,
batch_size="auto",
device=_internal_device,
)
# Merge per-task results into raw_results
if not raw_results:
raw_results = task_result
else:
if "results" in task_result and "results" in raw_results:
raw_results["results"].update(task_result.get("results", {}))
evaluated_tasks.append(task_name)
except Exception as task_exc:
logger.warning(
f"[LM_EVAL] Task '{task_name}' failed: {task_exc}"
)
error_tasks.append(task_name)
skipped_tasks.extend(error_tasks)
elapsed = time.time() - t0
# --- Extract per-task metrics ---
# Group tasks (e.g. global_mmlu_ko, mmlu) expand to subtasks at eval time.
# Capture ALL result keys, not just the originally requested task names,
# so that subtask-level metrics are available for downstream reporting.
per_task_metrics: dict[str, dict] = {}
lm_results: dict[str, Any] = raw_results.get("results", {})
for task_name, task_data in lm_results.items():
if not isinstance(task_data, dict):
continue
metrics: dict[str, Any] = {}
for key, value in task_data.items():
# Skip non-metric metadata keys
if key in ("alias", "group"):
continue
metrics[key] = value
per_task_metrics[task_name] = metrics
# Warn about any requested tasks that produced no results at all
for task_name in evaluated_tasks:
if task_name not in per_task_metrics:
logger.warning(
f"[LM_EVAL] Task '{task_name}' not found in results dict after evaluation."
)
# --- Summary print ---
print(f"[LM_EVAL] Evaluation complete in {elapsed:.1f}s")
for task_name, metrics in per_task_metrics.items():
# Print the most common accuracy variants
acc = metrics.get("acc,none") or metrics.get("acc") or metrics.get("accuracy")
acc_norm = metrics.get("acc_norm,none") or metrics.get("acc_norm")
if acc is not None:
line = f" {task_name}: acc={acc:.4f}"
if acc_norm is not None:
line += f", acc_norm={acc_norm:.4f}"
print(f"[LM_EVAL] {line}")
else:
print(f"[LM_EVAL] {task_name}: {metrics}")
if skipped_tasks:
print(f"[LM_EVAL] Skipped tasks: {skipped_tasks}")
return {
"model_path": hf_model_path,
"tasks_requested": tasks,
"tasks_evaluated": evaluated_tasks,
"tasks_skipped": skipped_tasks,
"per_task_metrics": per_task_metrics,
"raw_results": raw_results,
"elapsed_sec": round(elapsed, 1),
}
# ---------------------------------------------------------------------------
# Pipeline mode — load model ONCE, run multiple fewshot settings sequentially
# ---------------------------------------------------------------------------
def _extract_per_task_metrics(raw_results: dict) -> dict[str, dict]:
"""Extract per-task metrics from lm_eval raw results."""
per_task_metrics: dict[str, dict] = {}
lm_results: dict[str, Any] = raw_results.get("results", {})
for task_name, task_data in lm_results.items():
if not isinstance(task_data, dict):
continue
metrics = {k: v for k, v in task_data.items() if k not in ("alias", "group")}
per_task_metrics[task_name] = metrics
return per_task_metrics
def run_lm_eval_tasks_pipeline(
hf_model_path: str,
tasks: list[str],
device: str,
fewshot_values: list[int],
output_dir: str = "",
output_prefix: str = "",
) -> dict:
"""Run lm-eval with multiple fewshot settings, loading the model ONCE.
This avoids the overhead of loading the model N times when running
0-shot then 5-shot on the same GPU.
Returns:
Dict with keys like "0shot", "5shot", each containing the same
structure as run_lm_eval_tasks().
"""
import json as _json
import lm_eval # type: ignore[import]
from lm_eval.models.huggingface import HFLM # type: ignore[import]
# --- GPU isolation (same as run_lm_eval_tasks) ---
gpu_index = int(device.split(":")[-1])
os.environ["CUDA_VISIBLE_DEVICES"] = str(gpu_index)
_internal_device = "cuda:0"
print(
f"[LM_EVAL_PIPELINE] Loading model once on {device} "
f"for fewshot={fewshot_values}, tasks={tasks}",
flush=True,
)
# --- Load model ONCE ---
model_obj = HFLM(
pretrained=hf_model_path,
dtype="bfloat16",
device=_internal_device,
batch_size="auto",
)
# --- Validate tasks ---
try:
from lm_eval.tasks import TaskManager # type: ignore[import]
available_tasks = set(TaskManager().all_tasks)
except Exception:
available_tasks = set()
valid_tasks = [t for t in tasks if (not available_tasks) or (t in available_tasks)]
skipped_tasks = [t for t in tasks if t not in valid_tasks]
if not valid_tasks:
print("[LM_EVAL_PIPELINE] No valid tasks.", flush=True)
empty = {
"model_path": hf_model_path,
"tasks_requested": tasks,
"tasks_evaluated": [],
"tasks_skipped": skipped_tasks,
"per_task_metrics": {},
"raw_results": {},
"elapsed_sec": 0.0,
}
return {f"{n}shot": empty for n in fewshot_values}
# --- Run each fewshot setting, reusing model_obj ---
all_results: dict[str, Any] = {}
for num_fewshot in fewshot_values:
print(
f"[LM_EVAL_PIPELINE] Running {num_fewshot}-shot on {valid_tasks}...",
flush=True,
)
t0 = time.time()
try:
raw_results = lm_eval.simple_evaluate(
model=model_obj,
tasks=valid_tasks,
num_fewshot=num_fewshot,
)
per_task_metrics = _extract_per_task_metrics(raw_results)
elapsed = time.time() - t0
shot_result = {
"model_path": hf_model_path,
"tasks_requested": tasks,
"tasks_evaluated": list(valid_tasks),
"tasks_skipped": list(skipped_tasks),
"per_task_metrics": per_task_metrics,
"raw_results": raw_results,
"elapsed_sec": round(elapsed, 1),
}
print(
f"[LM_EVAL_PIPELINE] {num_fewshot}-shot complete in {elapsed:.1f}s",
flush=True,
)
except Exception as exc:
elapsed = time.time() - t0
shot_result = {
"model_path": hf_model_path,
"tasks_requested": tasks,
"tasks_evaluated": [],
"tasks_skipped": list(tasks),
"per_task_metrics": {},
"raw_results": {},
"elapsed_sec": round(elapsed, 1),
"error": str(exc),
}
print(
f"[LM_EVAL_PIPELINE] {num_fewshot}-shot FAILED: {exc}",
flush=True,
)
all_results[f"{num_fewshot}shot"] = shot_result
# Save intermediate result per fewshot
if output_dir:
shot_path = Path(output_dir) / f"{output_prefix}_{num_fewshot}shot.json"
with open(shot_path, "w", encoding="utf-8") as f:
_json.dump(shot_result, f, ensure_ascii=False, indent=2, default=str)
return all_results