""" Fine-tuning pipeline with structured logging and eval hooks. Pipeline stages: 1. Preflight validation — config, GPU, disk, token 2. Dataset preparation — load, tokenize, split 3. Model initialisation — base model + LoRA adapters 4. Training — Trainer with custom callbacks 5. Evaluation — post-training metric suite 6. Checkpoint export — save + optional HF Hub push Each stage emits structured log events. Eval hooks are composable and run both during training (via TrainerCallback) and post-training. """ from __future__ import annotations import json import os import shutil import time from dataclasses import dataclass, field from pathlib import Path from typing import Any import structlog import torch from datasets import Dataset, DatasetDict, load_dataset from peft import LoraConfig, TaskType, get_peft_model from transformers import ( AutoModelForCausalLM, AutoTokenizer, DataCollatorForLanguageModeling, PreTrainedModel, PreTrainedTokenizerBase, Trainer, TrainerCallback, TrainerControl, TrainerState, TrainingArguments, ) from training.config import EvalMetric, EvalStrategy, TrainingJobConfig from training.evaluators import ( BleuEvaluator, ExecutionAccuracyEvaluator, ExactMatchEvaluator, PassAtKEvaluator, ) log = structlog.get_logger(__name__) # --------------------------------------------------------------------------- # Eval result container # --------------------------------------------------------------------------- @dataclass class EvalResults: job_name: str epoch: float step: int metrics: dict[str, float] = field(default_factory=dict) errors: list[str] = field(default_factory=list) duration_seconds: float = 0.0 def log(self, bound_log: structlog.BoundLogger) -> None: bound_log.info( "eval.completed", epoch=self.epoch, step=self.step, duration_seconds=round(self.duration_seconds, 2), **self.metrics, ) for error in self.errors: bound_log.warning("eval.error", message=error) def to_dict(self) -> dict[str, Any]: return { "job_name": self.job_name, "epoch": self.epoch, "step": self.step, "metrics": self.metrics, "errors": self.errors, "duration_seconds": self.duration_seconds, } # --------------------------------------------------------------------------- # Eval hook registry # --------------------------------------------------------------------------- class EvalHookRunner: """ Runs the configured evaluation metrics against a model + dataset. Evaluators are resolved from the job config at construction time. Each evaluator is independent; failures in one do not abort others. """ def __init__(self, config: TrainingJobConfig, tokenizer: PreTrainedTokenizerBase) -> None: self._config = config self._tokenizer = tokenizer self._evaluators = self._build_evaluators() self._log = log.bind(job=config.job_name) def _build_evaluators(self) -> dict[EvalMetric, Any]: evals: dict[EvalMetric, Any] = {} eval_cfg = self._config.evaluation for metric in eval_cfg.metrics: match metric: case EvalMetric.PASS_AT_1: evals[metric] = PassAtKEvaluator(k=1, n=eval_cfg.num_samples_per_problem) case EvalMetric.PASS_AT_10: evals[metric] = PassAtKEvaluator(k=10, n=eval_cfg.num_samples_per_problem) case EvalMetric.BLEU: evals[metric] = BleuEvaluator() case EvalMetric.EXECUTION_ACCURACY: evals[metric] = ExecutionAccuracyEvaluator( timeout=self._config.evaluation.timeout_seconds ) case EvalMetric.EXACT_MATCH: evals[metric] = ExactMatchEvaluator() return evals def run( self, model: PreTrainedModel, eval_dataset: Dataset, epoch: float, step: int, ) -> EvalResults: start = time.perf_counter() results = EvalResults(job_name=self._config.job_name, epoch=epoch, step=step) model.eval() with torch.no_grad(): for metric, evaluator in self._evaluators.items(): try: score = evaluator.evaluate( model=model, tokenizer=self._tokenizer, dataset=eval_dataset, ) results.metrics[metric.value] = round(score, 4) self._log.info("eval.metric", metric=metric.value, score=score) except Exception as exc: # noqa: BLE001 msg = f"{metric.value}: {exc}" results.errors.append(msg) self._log.warning("eval.metric_failed", metric=metric.value, error=str(exc)) results.duration_seconds = time.perf_counter() - start results.log(self._log) return results # --------------------------------------------------------------------------- # Custom training callback # --------------------------------------------------------------------------- class CodeCraftLabCallback(TrainerCallback): """ Injects structured logging and eval hooks into the HF Trainer loop. """ def __init__( self, hook_runner: EvalHookRunner, eval_dataset: Dataset, results_path: Path, ) -> None: self._runner = hook_runner self._eval_dataset = eval_dataset self._results_path = results_path self._all_results: list[dict[str, Any]] = [] self._log = log def on_epoch_end( self, args: TrainingArguments, state: TrainerState, control: TrainerControl, model: PreTrainedModel, **kwargs: Any, ) -> TrainerControl: self._log.info( "training.epoch_end", epoch=state.epoch, step=state.global_step, loss=state.log_history[-1].get("loss") if state.log_history else None, ) results = self._runner.run( model=model, eval_dataset=self._eval_dataset, epoch=state.epoch or 0.0, step=state.global_step, ) self._all_results.append(results.to_dict()) self._persist_results() return control def on_log( self, args: TrainingArguments, state: TrainerState, control: TrainerControl, logs: dict[str, float], **kwargs: Any, ) -> TrainerControl: self._log.info("training.log", step=state.global_step, **logs) return control def on_train_end( self, args: TrainingArguments, state: TrainerState, control: TrainerControl, **kwargs: Any, ) -> TrainerControl: self._log.info( "training.completed", total_steps=state.global_step, total_flos=state.total_flos, ) return control def _persist_results(self) -> None: self._results_path.write_text( json.dumps(self._all_results, indent=2), encoding="utf-8" ) # --------------------------------------------------------------------------- # Pipeline # --------------------------------------------------------------------------- class FineTuningPipeline: """ Orchestrates the full fine-tuning lifecycle. Usage: config = TrainingJobConfig.model_validate(raw_dict) pipeline = FineTuningPipeline(config) pipeline.run() """ def __init__(self, config: TrainingJobConfig) -> None: self._config = config self._log = log.bind(job=config.job_name, model=config.base_model) self._output_dir = Path(config.checkpoint.output_dir) / config.job_name # ------------------------------------------------------------------ # Public entry point # ------------------------------------------------------------------ def run(self) -> Path: """Execute all pipeline stages. Returns the final checkpoint path.""" self._log.info("pipeline.started") self._preflight() datasets = self._prepare_datasets() model, tokenizer = self._load_model() self._train(model, tokenizer, datasets) final_path = self._export(model, tokenizer) self._log.info("pipeline.finished", output=str(final_path)) return final_path # ------------------------------------------------------------------ # Stage 1: Preflight # ------------------------------------------------------------------ def _preflight(self) -> None: self._log.info("pipeline.preflight") # Validate config (already done at submission, but be defensive) self._config.model_validate(self._config.model_dump()) # GPU check if torch.cuda.is_available(): device_name = torch.cuda.get_device_name(0) vram_gb = torch.cuda.get_device_properties(0).total_memory / 1e9 self._log.info("preflight.gpu", device=device_name, vram_gb=round(vram_gb, 1)) else: self._log.warning("preflight.no_gpu", message="Training on CPU — will be slow") # Disk space (rough check — 20 GB minimum) free_gb = shutil.disk_usage(self._output_dir.parent).free / 1e9 if free_gb < 20: self._log.warning("preflight.disk_low", free_gb=round(free_gb, 1)) # HF token if pushing if self._config.hub.push_to_hub and not os.environ.get("HF_TOKEN"): raise EnvironmentError("HF_TOKEN is required when hub.push_to_hub=true") self._output_dir.mkdir(parents=True, exist_ok=True) self._log.info("preflight.passed") # ------------------------------------------------------------------ # Stage 2: Dataset preparation # ------------------------------------------------------------------ def _prepare_datasets(self) -> DatasetDict: self._log.info("pipeline.dataset_prep") ds_cfg = self._config.dataset # Load — support both HF Hub paths and internal dataset IDs raw: Dataset if ds_cfg.dataset_id.startswith("ds_"): # Internal dataset — load from local store raw = Dataset.load_from_disk(f"./data/{ds_cfg.dataset_id}") else: raw = load_dataset(ds_cfg.dataset_id, split="train") # type: ignore[assignment] if ds_cfg.max_samples: raw = raw.select(range(min(ds_cfg.max_samples, len(raw)))) if ds_cfg.shuffle: raw = raw.shuffle(seed=ds_cfg.shuffle_seed) n_train = int(len(raw) * ds_cfg.split_ratio) splits = DatasetDict( { "train": raw.select(range(n_train)), "eval": raw.select(range(n_train, len(raw))), } ) self._log.info( "dataset.prepared", train_size=len(splits["train"]), eval_size=len(splits["eval"]), column=ds_cfg.text_column, ) return splits # ------------------------------------------------------------------ # Stage 3: Model initialisation # ------------------------------------------------------------------ def _load_model(self) -> tuple[PreTrainedModel, PreTrainedTokenizerBase]: self._log.info("pipeline.model_load") hp = self._config.training dtype_map = { "fp32": torch.float32, "fp16": torch.float16, "bf16": torch.bfloat16, } torch_dtype = dtype_map.get(hp.precision.value, torch.bfloat16) tokenizer = AutoTokenizer.from_pretrained(self._config.base_model) if tokenizer.pad_token is None: tokenizer.pad_token = tokenizer.eos_token model = AutoModelForCausalLM.from_pretrained( self._config.base_model, torch_dtype=torch_dtype, device_map="auto" if torch.cuda.is_available() else "cpu", ) if self._config.lora and self._config.lora.enabled: lora_cfg = self._config.lora peft_config = LoraConfig( task_type=TaskType.CAUSAL_LM, r=lora_cfg.r, lora_alpha=lora_cfg.alpha, lora_dropout=lora_cfg.dropout, target_modules=lora_cfg.target_modules, bias=lora_cfg.bias, # type: ignore[arg-type] ) model = get_peft_model(model, peft_config) trainable, total = model.get_nb_trainable_parameters() self._log.info( "model.lora_applied", trainable_params=trainable, total_params=total, trainable_pct=round(100 * trainable / total, 2), ) else: self._log.info("model.full_finetune") return model, tokenizer # type: ignore[return-value] # ------------------------------------------------------------------ # Stage 4: Training # ------------------------------------------------------------------ def _train( self, model: PreTrainedModel, tokenizer: PreTrainedTokenizerBase, datasets: DatasetDict, ) -> None: self._log.info("pipeline.training_start") hp = self._config.training ckpt = self._config.checkpoint eval_cfg = self._config.evaluation def tokenize(examples: dict[str, list[str]]) -> dict[str, Any]: return tokenizer( examples[self._config.dataset.text_column], truncation=True, max_length=hp.max_seq_length, padding=False, ) tokenized = datasets.map(tokenize, batched=True, remove_columns=datasets["train"].column_names) training_args = TrainingArguments( output_dir=str(self._output_dir), num_train_epochs=hp.num_epochs, per_device_train_batch_size=hp.batch_size, per_device_eval_batch_size=hp.batch_size, gradient_accumulation_steps=hp.gradient_accumulation_steps, learning_rate=hp.learning_rate, weight_decay=hp.weight_decay, warmup_ratio=hp.warmup_ratio, max_grad_norm=hp.max_grad_norm, optim=hp.optimizer.value, lr_scheduler_type=hp.lr_scheduler, fp16=hp.precision.value == "fp16", bf16=hp.precision.value == "bf16", evaluation_strategy=eval_cfg.strategy.value, eval_steps=eval_cfg.eval_steps, save_strategy=ckpt.save_strategy.value, save_steps=ckpt.save_steps, save_total_limit=ckpt.save_total_limit, load_best_model_at_end=eval_cfg.load_best_model_at_end, metric_for_best_model=eval_cfg.metric_for_best_model.value, greater_is_better=eval_cfg.greater_is_better, seed=hp.seed, dataloader_num_workers=hp.dataloader_num_workers, report_to="none", # structlog handles all logging logging_steps=10, resume_from_checkpoint=ckpt.resume_from_checkpoint, push_to_hub=False, # push handled separately in export stage ) hook_runner = EvalHookRunner(self._config, tokenizer) results_path = self._output_dir / "eval_results.json" callback = CodeCraftLabCallback( hook_runner=hook_runner, eval_dataset=datasets["eval"], results_path=results_path, ) trainer = Trainer( model=model, args=training_args, train_dataset=tokenized["train"], eval_dataset=tokenized["eval"], data_collator=DataCollatorForLanguageModeling(tokenizer, mlm=False), callbacks=[callback], ) trainer.train(resume_from_checkpoint=ckpt.resume_from_checkpoint) # ------------------------------------------------------------------ # Stage 5: Export + Hub push # ------------------------------------------------------------------ def _export(self, model: PreTrainedModel, tokenizer: PreTrainedTokenizerBase) -> Path: self._log.info("pipeline.export") final_path = self._output_dir / "final" model.save_pretrained(str(final_path)) tokenizer.save_pretrained(str(final_path)) self._log.info("model.saved", path=str(final_path)) hub_cfg = self._config.hub if hub_cfg.push_to_hub and hub_cfg.repo_id: self._log.info("hub.pushing", repo_id=hub_cfg.repo_id) model.push_to_hub(hub_cfg.repo_id, private=hub_cfg.private) tokenizer.push_to_hub(hub_cfg.repo_id, private=hub_cfg.private) self._log.info("hub.pushed", repo_id=hub_cfg.repo_id) return final_path