CodeCraftLab / pipeline.py
S-Dreamer's picture
Upload 4 files
b9ed97d verified
"""
Fine-tuning pipeline with structured logging and eval hooks.
Pipeline stages:
1. Preflight validation — config, GPU, disk, token
2. Dataset preparation — load, tokenize, split
3. Model initialisation — base model + LoRA adapters
4. Training — Trainer with custom callbacks
5. Evaluation — post-training metric suite
6. Checkpoint export — save + optional HF Hub push
Each stage emits structured log events. Eval hooks are composable and
run both during training (via TrainerCallback) and post-training.
"""
from __future__ import annotations
import json
import os
import shutil
import time
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any
import structlog
import torch
from datasets import Dataset, DatasetDict, load_dataset
from peft import LoraConfig, TaskType, get_peft_model
from transformers import (
AutoModelForCausalLM,
AutoTokenizer,
DataCollatorForLanguageModeling,
PreTrainedModel,
PreTrainedTokenizerBase,
Trainer,
TrainerCallback,
TrainerControl,
TrainerState,
TrainingArguments,
)
from training.config import EvalMetric, EvalStrategy, TrainingJobConfig
from training.evaluators import (
BleuEvaluator,
ExecutionAccuracyEvaluator,
ExactMatchEvaluator,
PassAtKEvaluator,
)
log = structlog.get_logger(__name__)
# ---------------------------------------------------------------------------
# Eval result container
# ---------------------------------------------------------------------------
@dataclass
class EvalResults:
job_name: str
epoch: float
step: int
metrics: dict[str, float] = field(default_factory=dict)
errors: list[str] = field(default_factory=list)
duration_seconds: float = 0.0
def log(self, bound_log: structlog.BoundLogger) -> None:
bound_log.info(
"eval.completed",
epoch=self.epoch,
step=self.step,
duration_seconds=round(self.duration_seconds, 2),
**self.metrics,
)
for error in self.errors:
bound_log.warning("eval.error", message=error)
def to_dict(self) -> dict[str, Any]:
return {
"job_name": self.job_name,
"epoch": self.epoch,
"step": self.step,
"metrics": self.metrics,
"errors": self.errors,
"duration_seconds": self.duration_seconds,
}
# ---------------------------------------------------------------------------
# Eval hook registry
# ---------------------------------------------------------------------------
class EvalHookRunner:
"""
Runs the configured evaluation metrics against a model + dataset.
Evaluators are resolved from the job config at construction time.
Each evaluator is independent; failures in one do not abort others.
"""
def __init__(self, config: TrainingJobConfig, tokenizer: PreTrainedTokenizerBase) -> None:
self._config = config
self._tokenizer = tokenizer
self._evaluators = self._build_evaluators()
self._log = log.bind(job=config.job_name)
def _build_evaluators(self) -> dict[EvalMetric, Any]:
evals: dict[EvalMetric, Any] = {}
eval_cfg = self._config.evaluation
for metric in eval_cfg.metrics:
match metric:
case EvalMetric.PASS_AT_1:
evals[metric] = PassAtKEvaluator(k=1, n=eval_cfg.num_samples_per_problem)
case EvalMetric.PASS_AT_10:
evals[metric] = PassAtKEvaluator(k=10, n=eval_cfg.num_samples_per_problem)
case EvalMetric.BLEU:
evals[metric] = BleuEvaluator()
case EvalMetric.EXECUTION_ACCURACY:
evals[metric] = ExecutionAccuracyEvaluator(
timeout=self._config.evaluation.timeout_seconds
)
case EvalMetric.EXACT_MATCH:
evals[metric] = ExactMatchEvaluator()
return evals
def run(
self,
model: PreTrainedModel,
eval_dataset: Dataset,
epoch: float,
step: int,
) -> EvalResults:
start = time.perf_counter()
results = EvalResults(job_name=self._config.job_name, epoch=epoch, step=step)
model.eval()
with torch.no_grad():
for metric, evaluator in self._evaluators.items():
try:
score = evaluator.evaluate(
model=model,
tokenizer=self._tokenizer,
dataset=eval_dataset,
)
results.metrics[metric.value] = round(score, 4)
self._log.info("eval.metric", metric=metric.value, score=score)
except Exception as exc: # noqa: BLE001
msg = f"{metric.value}: {exc}"
results.errors.append(msg)
self._log.warning("eval.metric_failed", metric=metric.value, error=str(exc))
results.duration_seconds = time.perf_counter() - start
results.log(self._log)
return results
# ---------------------------------------------------------------------------
# Custom training callback
# ---------------------------------------------------------------------------
class CodeCraftLabCallback(TrainerCallback):
"""
Injects structured logging and eval hooks into the HF Trainer loop.
"""
def __init__(
self,
hook_runner: EvalHookRunner,
eval_dataset: Dataset,
results_path: Path,
) -> None:
self._runner = hook_runner
self._eval_dataset = eval_dataset
self._results_path = results_path
self._all_results: list[dict[str, Any]] = []
self._log = log
def on_epoch_end(
self,
args: TrainingArguments,
state: TrainerState,
control: TrainerControl,
model: PreTrainedModel,
**kwargs: Any,
) -> TrainerControl:
self._log.info(
"training.epoch_end",
epoch=state.epoch,
step=state.global_step,
loss=state.log_history[-1].get("loss") if state.log_history else None,
)
results = self._runner.run(
model=model,
eval_dataset=self._eval_dataset,
epoch=state.epoch or 0.0,
step=state.global_step,
)
self._all_results.append(results.to_dict())
self._persist_results()
return control
def on_log(
self,
args: TrainingArguments,
state: TrainerState,
control: TrainerControl,
logs: dict[str, float],
**kwargs: Any,
) -> TrainerControl:
self._log.info("training.log", step=state.global_step, **logs)
return control
def on_train_end(
self,
args: TrainingArguments,
state: TrainerState,
control: TrainerControl,
**kwargs: Any,
) -> TrainerControl:
self._log.info(
"training.completed",
total_steps=state.global_step,
total_flos=state.total_flos,
)
return control
def _persist_results(self) -> None:
self._results_path.write_text(
json.dumps(self._all_results, indent=2), encoding="utf-8"
)
# ---------------------------------------------------------------------------
# Pipeline
# ---------------------------------------------------------------------------
class FineTuningPipeline:
"""
Orchestrates the full fine-tuning lifecycle.
Usage:
config = TrainingJobConfig.model_validate(raw_dict)
pipeline = FineTuningPipeline(config)
pipeline.run()
"""
def __init__(self, config: TrainingJobConfig) -> None:
self._config = config
self._log = log.bind(job=config.job_name, model=config.base_model)
self._output_dir = Path(config.checkpoint.output_dir) / config.job_name
# ------------------------------------------------------------------
# Public entry point
# ------------------------------------------------------------------
def run(self) -> Path:
"""Execute all pipeline stages. Returns the final checkpoint path."""
self._log.info("pipeline.started")
self._preflight()
datasets = self._prepare_datasets()
model, tokenizer = self._load_model()
self._train(model, tokenizer, datasets)
final_path = self._export(model, tokenizer)
self._log.info("pipeline.finished", output=str(final_path))
return final_path
# ------------------------------------------------------------------
# Stage 1: Preflight
# ------------------------------------------------------------------
def _preflight(self) -> None:
self._log.info("pipeline.preflight")
# Validate config (already done at submission, but be defensive)
self._config.model_validate(self._config.model_dump())
# GPU check
if torch.cuda.is_available():
device_name = torch.cuda.get_device_name(0)
vram_gb = torch.cuda.get_device_properties(0).total_memory / 1e9
self._log.info("preflight.gpu", device=device_name, vram_gb=round(vram_gb, 1))
else:
self._log.warning("preflight.no_gpu", message="Training on CPU — will be slow")
# Disk space (rough check — 20 GB minimum)
free_gb = shutil.disk_usage(self._output_dir.parent).free / 1e9
if free_gb < 20:
self._log.warning("preflight.disk_low", free_gb=round(free_gb, 1))
# HF token if pushing
if self._config.hub.push_to_hub and not os.environ.get("HF_TOKEN"):
raise EnvironmentError("HF_TOKEN is required when hub.push_to_hub=true")
self._output_dir.mkdir(parents=True, exist_ok=True)
self._log.info("preflight.passed")
# ------------------------------------------------------------------
# Stage 2: Dataset preparation
# ------------------------------------------------------------------
def _prepare_datasets(self) -> DatasetDict:
self._log.info("pipeline.dataset_prep")
ds_cfg = self._config.dataset
# Load — support both HF Hub paths and internal dataset IDs
raw: Dataset
if ds_cfg.dataset_id.startswith("ds_"):
# Internal dataset — load from local store
raw = Dataset.load_from_disk(f"./data/{ds_cfg.dataset_id}")
else:
raw = load_dataset(ds_cfg.dataset_id, split="train") # type: ignore[assignment]
if ds_cfg.max_samples:
raw = raw.select(range(min(ds_cfg.max_samples, len(raw))))
if ds_cfg.shuffle:
raw = raw.shuffle(seed=ds_cfg.shuffle_seed)
n_train = int(len(raw) * ds_cfg.split_ratio)
splits = DatasetDict(
{
"train": raw.select(range(n_train)),
"eval": raw.select(range(n_train, len(raw))),
}
)
self._log.info(
"dataset.prepared",
train_size=len(splits["train"]),
eval_size=len(splits["eval"]),
column=ds_cfg.text_column,
)
return splits
# ------------------------------------------------------------------
# Stage 3: Model initialisation
# ------------------------------------------------------------------
def _load_model(self) -> tuple[PreTrainedModel, PreTrainedTokenizerBase]:
self._log.info("pipeline.model_load")
hp = self._config.training
dtype_map = {
"fp32": torch.float32,
"fp16": torch.float16,
"bf16": torch.bfloat16,
}
torch_dtype = dtype_map.get(hp.precision.value, torch.bfloat16)
tokenizer = AutoTokenizer.from_pretrained(self._config.base_model)
if tokenizer.pad_token is None:
tokenizer.pad_token = tokenizer.eos_token
model = AutoModelForCausalLM.from_pretrained(
self._config.base_model,
torch_dtype=torch_dtype,
device_map="auto" if torch.cuda.is_available() else "cpu",
)
if self._config.lora and self._config.lora.enabled:
lora_cfg = self._config.lora
peft_config = LoraConfig(
task_type=TaskType.CAUSAL_LM,
r=lora_cfg.r,
lora_alpha=lora_cfg.alpha,
lora_dropout=lora_cfg.dropout,
target_modules=lora_cfg.target_modules,
bias=lora_cfg.bias, # type: ignore[arg-type]
)
model = get_peft_model(model, peft_config)
trainable, total = model.get_nb_trainable_parameters()
self._log.info(
"model.lora_applied",
trainable_params=trainable,
total_params=total,
trainable_pct=round(100 * trainable / total, 2),
)
else:
self._log.info("model.full_finetune")
return model, tokenizer # type: ignore[return-value]
# ------------------------------------------------------------------
# Stage 4: Training
# ------------------------------------------------------------------
def _train(
self,
model: PreTrainedModel,
tokenizer: PreTrainedTokenizerBase,
datasets: DatasetDict,
) -> None:
self._log.info("pipeline.training_start")
hp = self._config.training
ckpt = self._config.checkpoint
eval_cfg = self._config.evaluation
def tokenize(examples: dict[str, list[str]]) -> dict[str, Any]:
return tokenizer(
examples[self._config.dataset.text_column],
truncation=True,
max_length=hp.max_seq_length,
padding=False,
)
tokenized = datasets.map(tokenize, batched=True, remove_columns=datasets["train"].column_names)
training_args = TrainingArguments(
output_dir=str(self._output_dir),
num_train_epochs=hp.num_epochs,
per_device_train_batch_size=hp.batch_size,
per_device_eval_batch_size=hp.batch_size,
gradient_accumulation_steps=hp.gradient_accumulation_steps,
learning_rate=hp.learning_rate,
weight_decay=hp.weight_decay,
warmup_ratio=hp.warmup_ratio,
max_grad_norm=hp.max_grad_norm,
optim=hp.optimizer.value,
lr_scheduler_type=hp.lr_scheduler,
fp16=hp.precision.value == "fp16",
bf16=hp.precision.value == "bf16",
evaluation_strategy=eval_cfg.strategy.value,
eval_steps=eval_cfg.eval_steps,
save_strategy=ckpt.save_strategy.value,
save_steps=ckpt.save_steps,
save_total_limit=ckpt.save_total_limit,
load_best_model_at_end=eval_cfg.load_best_model_at_end,
metric_for_best_model=eval_cfg.metric_for_best_model.value,
greater_is_better=eval_cfg.greater_is_better,
seed=hp.seed,
dataloader_num_workers=hp.dataloader_num_workers,
report_to="none", # structlog handles all logging
logging_steps=10,
resume_from_checkpoint=ckpt.resume_from_checkpoint,
push_to_hub=False, # push handled separately in export stage
)
hook_runner = EvalHookRunner(self._config, tokenizer)
results_path = self._output_dir / "eval_results.json"
callback = CodeCraftLabCallback(
hook_runner=hook_runner,
eval_dataset=datasets["eval"],
results_path=results_path,
)
trainer = Trainer(
model=model,
args=training_args,
train_dataset=tokenized["train"],
eval_dataset=tokenized["eval"],
data_collator=DataCollatorForLanguageModeling(tokenizer, mlm=False),
callbacks=[callback],
)
trainer.train(resume_from_checkpoint=ckpt.resume_from_checkpoint)
# ------------------------------------------------------------------
# Stage 5: Export + Hub push
# ------------------------------------------------------------------
def _export(self, model: PreTrainedModel, tokenizer: PreTrainedTokenizerBase) -> Path:
self._log.info("pipeline.export")
final_path = self._output_dir / "final"
model.save_pretrained(str(final_path))
tokenizer.save_pretrained(str(final_path))
self._log.info("model.saved", path=str(final_path))
hub_cfg = self._config.hub
if hub_cfg.push_to_hub and hub_cfg.repo_id:
self._log.info("hub.pushing", repo_id=hub_cfg.repo_id)
model.push_to_hub(hub_cfg.repo_id, private=hub_cfg.private)
tokenizer.push_to_hub(hub_cfg.repo_id, private=hub_cfg.private)
self._log.info("hub.pushed", repo_id=hub_cfg.repo_id)
return final_path