Spaces:
Runtime error
Runtime error
| """ | |
| Fine-tuning pipeline with structured logging and eval hooks. | |
| Pipeline stages: | |
| 1. Preflight validation — config, GPU, disk, token | |
| 2. Dataset preparation — load, tokenize, split | |
| 3. Model initialisation — base model + LoRA adapters | |
| 4. Training — Trainer with custom callbacks | |
| 5. Evaluation — post-training metric suite | |
| 6. Checkpoint export — save + optional HF Hub push | |
| Each stage emits structured log events. Eval hooks are composable and | |
| run both during training (via TrainerCallback) and post-training. | |
| """ | |
| from __future__ import annotations | |
| import json | |
| import os | |
| import shutil | |
| import time | |
| from dataclasses import dataclass, field | |
| from pathlib import Path | |
| from typing import Any | |
| import structlog | |
| import torch | |
| from datasets import Dataset, DatasetDict, load_dataset | |
| from peft import LoraConfig, TaskType, get_peft_model | |
| from transformers import ( | |
| AutoModelForCausalLM, | |
| AutoTokenizer, | |
| DataCollatorForLanguageModeling, | |
| PreTrainedModel, | |
| PreTrainedTokenizerBase, | |
| Trainer, | |
| TrainerCallback, | |
| TrainerControl, | |
| TrainerState, | |
| TrainingArguments, | |
| ) | |
| from training.config import EvalMetric, EvalStrategy, TrainingJobConfig | |
| from training.evaluators import ( | |
| BleuEvaluator, | |
| ExecutionAccuracyEvaluator, | |
| ExactMatchEvaluator, | |
| PassAtKEvaluator, | |
| ) | |
| log = structlog.get_logger(__name__) | |
| # --------------------------------------------------------------------------- | |
| # Eval result container | |
| # --------------------------------------------------------------------------- | |
| class EvalResults: | |
| job_name: str | |
| epoch: float | |
| step: int | |
| metrics: dict[str, float] = field(default_factory=dict) | |
| errors: list[str] = field(default_factory=list) | |
| duration_seconds: float = 0.0 | |
| def log(self, bound_log: structlog.BoundLogger) -> None: | |
| bound_log.info( | |
| "eval.completed", | |
| epoch=self.epoch, | |
| step=self.step, | |
| duration_seconds=round(self.duration_seconds, 2), | |
| **self.metrics, | |
| ) | |
| for error in self.errors: | |
| bound_log.warning("eval.error", message=error) | |
| def to_dict(self) -> dict[str, Any]: | |
| return { | |
| "job_name": self.job_name, | |
| "epoch": self.epoch, | |
| "step": self.step, | |
| "metrics": self.metrics, | |
| "errors": self.errors, | |
| "duration_seconds": self.duration_seconds, | |
| } | |
| # --------------------------------------------------------------------------- | |
| # Eval hook registry | |
| # --------------------------------------------------------------------------- | |
| class EvalHookRunner: | |
| """ | |
| Runs the configured evaluation metrics against a model + dataset. | |
| Evaluators are resolved from the job config at construction time. | |
| Each evaluator is independent; failures in one do not abort others. | |
| """ | |
| def __init__(self, config: TrainingJobConfig, tokenizer: PreTrainedTokenizerBase) -> None: | |
| self._config = config | |
| self._tokenizer = tokenizer | |
| self._evaluators = self._build_evaluators() | |
| self._log = log.bind(job=config.job_name) | |
| def _build_evaluators(self) -> dict[EvalMetric, Any]: | |
| evals: dict[EvalMetric, Any] = {} | |
| eval_cfg = self._config.evaluation | |
| for metric in eval_cfg.metrics: | |
| match metric: | |
| case EvalMetric.PASS_AT_1: | |
| evals[metric] = PassAtKEvaluator(k=1, n=eval_cfg.num_samples_per_problem) | |
| case EvalMetric.PASS_AT_10: | |
| evals[metric] = PassAtKEvaluator(k=10, n=eval_cfg.num_samples_per_problem) | |
| case EvalMetric.BLEU: | |
| evals[metric] = BleuEvaluator() | |
| case EvalMetric.EXECUTION_ACCURACY: | |
| evals[metric] = ExecutionAccuracyEvaluator( | |
| timeout=self._config.evaluation.timeout_seconds | |
| ) | |
| case EvalMetric.EXACT_MATCH: | |
| evals[metric] = ExactMatchEvaluator() | |
| return evals | |
| def run( | |
| self, | |
| model: PreTrainedModel, | |
| eval_dataset: Dataset, | |
| epoch: float, | |
| step: int, | |
| ) -> EvalResults: | |
| start = time.perf_counter() | |
| results = EvalResults(job_name=self._config.job_name, epoch=epoch, step=step) | |
| model.eval() | |
| with torch.no_grad(): | |
| for metric, evaluator in self._evaluators.items(): | |
| try: | |
| score = evaluator.evaluate( | |
| model=model, | |
| tokenizer=self._tokenizer, | |
| dataset=eval_dataset, | |
| ) | |
| results.metrics[metric.value] = round(score, 4) | |
| self._log.info("eval.metric", metric=metric.value, score=score) | |
| except Exception as exc: # noqa: BLE001 | |
| msg = f"{metric.value}: {exc}" | |
| results.errors.append(msg) | |
| self._log.warning("eval.metric_failed", metric=metric.value, error=str(exc)) | |
| results.duration_seconds = time.perf_counter() - start | |
| results.log(self._log) | |
| return results | |
| # --------------------------------------------------------------------------- | |
| # Custom training callback | |
| # --------------------------------------------------------------------------- | |
| class CodeCraftLabCallback(TrainerCallback): | |
| """ | |
| Injects structured logging and eval hooks into the HF Trainer loop. | |
| """ | |
| def __init__( | |
| self, | |
| hook_runner: EvalHookRunner, | |
| eval_dataset: Dataset, | |
| results_path: Path, | |
| ) -> None: | |
| self._runner = hook_runner | |
| self._eval_dataset = eval_dataset | |
| self._results_path = results_path | |
| self._all_results: list[dict[str, Any]] = [] | |
| self._log = log | |
| def on_epoch_end( | |
| self, | |
| args: TrainingArguments, | |
| state: TrainerState, | |
| control: TrainerControl, | |
| model: PreTrainedModel, | |
| **kwargs: Any, | |
| ) -> TrainerControl: | |
| self._log.info( | |
| "training.epoch_end", | |
| epoch=state.epoch, | |
| step=state.global_step, | |
| loss=state.log_history[-1].get("loss") if state.log_history else None, | |
| ) | |
| results = self._runner.run( | |
| model=model, | |
| eval_dataset=self._eval_dataset, | |
| epoch=state.epoch or 0.0, | |
| step=state.global_step, | |
| ) | |
| self._all_results.append(results.to_dict()) | |
| self._persist_results() | |
| return control | |
| def on_log( | |
| self, | |
| args: TrainingArguments, | |
| state: TrainerState, | |
| control: TrainerControl, | |
| logs: dict[str, float], | |
| **kwargs: Any, | |
| ) -> TrainerControl: | |
| self._log.info("training.log", step=state.global_step, **logs) | |
| return control | |
| def on_train_end( | |
| self, | |
| args: TrainingArguments, | |
| state: TrainerState, | |
| control: TrainerControl, | |
| **kwargs: Any, | |
| ) -> TrainerControl: | |
| self._log.info( | |
| "training.completed", | |
| total_steps=state.global_step, | |
| total_flos=state.total_flos, | |
| ) | |
| return control | |
| def _persist_results(self) -> None: | |
| self._results_path.write_text( | |
| json.dumps(self._all_results, indent=2), encoding="utf-8" | |
| ) | |
| # --------------------------------------------------------------------------- | |
| # Pipeline | |
| # --------------------------------------------------------------------------- | |
| class FineTuningPipeline: | |
| """ | |
| Orchestrates the full fine-tuning lifecycle. | |
| Usage: | |
| config = TrainingJobConfig.model_validate(raw_dict) | |
| pipeline = FineTuningPipeline(config) | |
| pipeline.run() | |
| """ | |
| def __init__(self, config: TrainingJobConfig) -> None: | |
| self._config = config | |
| self._log = log.bind(job=config.job_name, model=config.base_model) | |
| self._output_dir = Path(config.checkpoint.output_dir) / config.job_name | |
| # ------------------------------------------------------------------ | |
| # Public entry point | |
| # ------------------------------------------------------------------ | |
| def run(self) -> Path: | |
| """Execute all pipeline stages. Returns the final checkpoint path.""" | |
| self._log.info("pipeline.started") | |
| self._preflight() | |
| datasets = self._prepare_datasets() | |
| model, tokenizer = self._load_model() | |
| self._train(model, tokenizer, datasets) | |
| final_path = self._export(model, tokenizer) | |
| self._log.info("pipeline.finished", output=str(final_path)) | |
| return final_path | |
| # ------------------------------------------------------------------ | |
| # Stage 1: Preflight | |
| # ------------------------------------------------------------------ | |
| def _preflight(self) -> None: | |
| self._log.info("pipeline.preflight") | |
| # Validate config (already done at submission, but be defensive) | |
| self._config.model_validate(self._config.model_dump()) | |
| # GPU check | |
| if torch.cuda.is_available(): | |
| device_name = torch.cuda.get_device_name(0) | |
| vram_gb = torch.cuda.get_device_properties(0).total_memory / 1e9 | |
| self._log.info("preflight.gpu", device=device_name, vram_gb=round(vram_gb, 1)) | |
| else: | |
| self._log.warning("preflight.no_gpu", message="Training on CPU — will be slow") | |
| # Disk space (rough check — 20 GB minimum) | |
| free_gb = shutil.disk_usage(self._output_dir.parent).free / 1e9 | |
| if free_gb < 20: | |
| self._log.warning("preflight.disk_low", free_gb=round(free_gb, 1)) | |
| # HF token if pushing | |
| if self._config.hub.push_to_hub and not os.environ.get("HF_TOKEN"): | |
| raise EnvironmentError("HF_TOKEN is required when hub.push_to_hub=true") | |
| self._output_dir.mkdir(parents=True, exist_ok=True) | |
| self._log.info("preflight.passed") | |
| # ------------------------------------------------------------------ | |
| # Stage 2: Dataset preparation | |
| # ------------------------------------------------------------------ | |
| def _prepare_datasets(self) -> DatasetDict: | |
| self._log.info("pipeline.dataset_prep") | |
| ds_cfg = self._config.dataset | |
| # Load — support both HF Hub paths and internal dataset IDs | |
| raw: Dataset | |
| if ds_cfg.dataset_id.startswith("ds_"): | |
| # Internal dataset — load from local store | |
| raw = Dataset.load_from_disk(f"./data/{ds_cfg.dataset_id}") | |
| else: | |
| raw = load_dataset(ds_cfg.dataset_id, split="train") # type: ignore[assignment] | |
| if ds_cfg.max_samples: | |
| raw = raw.select(range(min(ds_cfg.max_samples, len(raw)))) | |
| if ds_cfg.shuffle: | |
| raw = raw.shuffle(seed=ds_cfg.shuffle_seed) | |
| n_train = int(len(raw) * ds_cfg.split_ratio) | |
| splits = DatasetDict( | |
| { | |
| "train": raw.select(range(n_train)), | |
| "eval": raw.select(range(n_train, len(raw))), | |
| } | |
| ) | |
| self._log.info( | |
| "dataset.prepared", | |
| train_size=len(splits["train"]), | |
| eval_size=len(splits["eval"]), | |
| column=ds_cfg.text_column, | |
| ) | |
| return splits | |
| # ------------------------------------------------------------------ | |
| # Stage 3: Model initialisation | |
| # ------------------------------------------------------------------ | |
| def _load_model(self) -> tuple[PreTrainedModel, PreTrainedTokenizerBase]: | |
| self._log.info("pipeline.model_load") | |
| hp = self._config.training | |
| dtype_map = { | |
| "fp32": torch.float32, | |
| "fp16": torch.float16, | |
| "bf16": torch.bfloat16, | |
| } | |
| torch_dtype = dtype_map.get(hp.precision.value, torch.bfloat16) | |
| tokenizer = AutoTokenizer.from_pretrained(self._config.base_model) | |
| if tokenizer.pad_token is None: | |
| tokenizer.pad_token = tokenizer.eos_token | |
| model = AutoModelForCausalLM.from_pretrained( | |
| self._config.base_model, | |
| torch_dtype=torch_dtype, | |
| device_map="auto" if torch.cuda.is_available() else "cpu", | |
| ) | |
| if self._config.lora and self._config.lora.enabled: | |
| lora_cfg = self._config.lora | |
| peft_config = LoraConfig( | |
| task_type=TaskType.CAUSAL_LM, | |
| r=lora_cfg.r, | |
| lora_alpha=lora_cfg.alpha, | |
| lora_dropout=lora_cfg.dropout, | |
| target_modules=lora_cfg.target_modules, | |
| bias=lora_cfg.bias, # type: ignore[arg-type] | |
| ) | |
| model = get_peft_model(model, peft_config) | |
| trainable, total = model.get_nb_trainable_parameters() | |
| self._log.info( | |
| "model.lora_applied", | |
| trainable_params=trainable, | |
| total_params=total, | |
| trainable_pct=round(100 * trainable / total, 2), | |
| ) | |
| else: | |
| self._log.info("model.full_finetune") | |
| return model, tokenizer # type: ignore[return-value] | |
| # ------------------------------------------------------------------ | |
| # Stage 4: Training | |
| # ------------------------------------------------------------------ | |
| def _train( | |
| self, | |
| model: PreTrainedModel, | |
| tokenizer: PreTrainedTokenizerBase, | |
| datasets: DatasetDict, | |
| ) -> None: | |
| self._log.info("pipeline.training_start") | |
| hp = self._config.training | |
| ckpt = self._config.checkpoint | |
| eval_cfg = self._config.evaluation | |
| def tokenize(examples: dict[str, list[str]]) -> dict[str, Any]: | |
| return tokenizer( | |
| examples[self._config.dataset.text_column], | |
| truncation=True, | |
| max_length=hp.max_seq_length, | |
| padding=False, | |
| ) | |
| tokenized = datasets.map(tokenize, batched=True, remove_columns=datasets["train"].column_names) | |
| training_args = TrainingArguments( | |
| output_dir=str(self._output_dir), | |
| num_train_epochs=hp.num_epochs, | |
| per_device_train_batch_size=hp.batch_size, | |
| per_device_eval_batch_size=hp.batch_size, | |
| gradient_accumulation_steps=hp.gradient_accumulation_steps, | |
| learning_rate=hp.learning_rate, | |
| weight_decay=hp.weight_decay, | |
| warmup_ratio=hp.warmup_ratio, | |
| max_grad_norm=hp.max_grad_norm, | |
| optim=hp.optimizer.value, | |
| lr_scheduler_type=hp.lr_scheduler, | |
| fp16=hp.precision.value == "fp16", | |
| bf16=hp.precision.value == "bf16", | |
| evaluation_strategy=eval_cfg.strategy.value, | |
| eval_steps=eval_cfg.eval_steps, | |
| save_strategy=ckpt.save_strategy.value, | |
| save_steps=ckpt.save_steps, | |
| save_total_limit=ckpt.save_total_limit, | |
| load_best_model_at_end=eval_cfg.load_best_model_at_end, | |
| metric_for_best_model=eval_cfg.metric_for_best_model.value, | |
| greater_is_better=eval_cfg.greater_is_better, | |
| seed=hp.seed, | |
| dataloader_num_workers=hp.dataloader_num_workers, | |
| report_to="none", # structlog handles all logging | |
| logging_steps=10, | |
| resume_from_checkpoint=ckpt.resume_from_checkpoint, | |
| push_to_hub=False, # push handled separately in export stage | |
| ) | |
| hook_runner = EvalHookRunner(self._config, tokenizer) | |
| results_path = self._output_dir / "eval_results.json" | |
| callback = CodeCraftLabCallback( | |
| hook_runner=hook_runner, | |
| eval_dataset=datasets["eval"], | |
| results_path=results_path, | |
| ) | |
| trainer = Trainer( | |
| model=model, | |
| args=training_args, | |
| train_dataset=tokenized["train"], | |
| eval_dataset=tokenized["eval"], | |
| data_collator=DataCollatorForLanguageModeling(tokenizer, mlm=False), | |
| callbacks=[callback], | |
| ) | |
| trainer.train(resume_from_checkpoint=ckpt.resume_from_checkpoint) | |
| # ------------------------------------------------------------------ | |
| # Stage 5: Export + Hub push | |
| # ------------------------------------------------------------------ | |
| def _export(self, model: PreTrainedModel, tokenizer: PreTrainedTokenizerBase) -> Path: | |
| self._log.info("pipeline.export") | |
| final_path = self._output_dir / "final" | |
| model.save_pretrained(str(final_path)) | |
| tokenizer.save_pretrained(str(final_path)) | |
| self._log.info("model.saved", path=str(final_path)) | |
| hub_cfg = self._config.hub | |
| if hub_cfg.push_to_hub and hub_cfg.repo_id: | |
| self._log.info("hub.pushing", repo_id=hub_cfg.repo_id) | |
| model.push_to_hub(hub_cfg.repo_id, private=hub_cfg.private) | |
| tokenizer.push_to_hub(hub_cfg.repo_id, private=hub_cfg.private) | |
| self._log.info("hub.pushed", repo_id=hub_cfg.repo_id) | |
| return final_path | |