| """ |
| model_utils.py β Model loading, LoRA setup, GPU detection, checkpoint resume. |
| |
| All persistent state lives on Hugging Face Hub. |
| GPU is ephemeral β we only ever write to Hub, never assume local disk survives. |
| """ |
|
|
| from __future__ import annotations |
|
|
| import os |
| from pathlib import Path |
| from typing import Any, Dict, Optional, Tuple |
|
|
| import torch |
| from huggingface_hub import HfApi, snapshot_download |
|
|
|
|
| |
| |
| |
|
|
| def detect_gpu_tier() -> str: |
| """Return 'a100', 'a10g', or 't4' based on GPU name and VRAM.""" |
| if not torch.cuda.is_available(): |
| print("[model_utils] No CUDA detected β will be extremely slow") |
| return "t4" |
| vram_gb = torch.cuda.get_device_properties(0).total_mem / 1e9 |
| name = torch.cuda.get_device_name(0).lower() |
| if "a100" in name or vram_gb >= 70: |
| return "a100" |
| elif "a10" in name or vram_gb >= 20: |
| return "a10g" |
| else: |
| return "t4" |
|
|
|
|
| def gpu_scaled_config(cfg: Dict[str, Any]) -> Dict[str, Any]: |
| """Auto-scale LoRA rank and seq_length based on GPU tier. |
| |
| SRE actions are short (~40 tokens output). The bottleneck is |
| env API latency, not context length. So we keep seq_len tight |
| (1024 is plenty) and spend VRAM on rank + episodes instead. |
| """ |
| tier = detect_gpu_tier() |
| overrides: Dict[str, Any] = {} |
|
|
| |
| |
| if tier == "a100": |
| overrides["max_seq_length"] = 2048 |
| overrides["lora_rank"] = 48 |
| overrides["lora_alpha"] = 48 |
| overrides["per_device_train_batch_size"] = 4 |
| elif tier == "a10g": |
| overrides["max_seq_length"] = 1024 |
| overrides["lora_rank"] = 32 |
| overrides["lora_alpha"] = 32 |
| overrides["per_device_train_batch_size"] = 2 |
| else: |
| overrides["max_seq_length"] = 1024 |
| overrides["lora_rank"] = 16 |
| overrides["lora_alpha"] = 16 |
| overrides["per_device_train_batch_size"] = 1 |
|
|
| |
| for key, default_val in overrides.items(): |
| env_key = f"ANTIATROPOS_{key.upper()}" |
| if env_key in os.environ: |
| val = os.environ[env_key] |
| |
| if isinstance(default_val, int): |
| overrides[key] = int(val) |
| elif isinstance(default_val, float): |
| overrides[key] = float(val) |
| if key in cfg and cfg[key] != overrides[key]: |
| print(f"[model_utils] GPU {tier}: overriding {key} " |
| f"{cfg[key]} -> {overrides[key]}") |
| cfg[key] = overrides[key] |
|
|
| return cfg |
|
|
|
|
| |
| |
| |
|
|
| def load_base_model(cfg: Dict[str, Any]): |
| """Load base model with Unsloth QLoRA. Returns (model, tokenizer).""" |
| from unsloth import FastLanguageModel |
|
|
| model_name = cfg["base_model"] |
| max_seq_length = cfg.get("max_seq_length", 1024) |
| load_in_4bit = cfg.get("load_in_4bit", True) |
|
|
| print(f"[model_utils] Loading {model_name} " |
| f"(seq_len={max_seq_length}, 4bit={load_in_4bit})") |
|
|
| model, tokenizer = FastLanguageModel.from_pretrained( |
| model_name=model_name, |
| max_seq_length=max_seq_length, |
| load_in_4bit=load_in_4bit, |
| dtype=None, |
| trust_remote_code=True, |
| ) |
|
|
| return model, tokenizer |
|
|
|
|
| def attach_lora(model, cfg: Dict[str, Any], seed: int = 42): |
| """Attach LoRA adapters to the base model.""" |
| from unsloth import FastLanguageModel |
|
|
| rank = cfg.get("lora_rank", 32) |
| alpha = cfg.get("lora_alpha", 32) |
| dropout = cfg.get("lora_dropout", 0.0) |
| target_modules = cfg.get("lora_target_modules", [ |
| "q_proj", "k_proj", "v_proj", "o_proj", |
| "gate_proj", "up_proj", "down_proj", |
| ]) |
|
|
| print(f"[model_utils] Attaching LoRA: rank={rank}, alpha={alpha}, " |
| f"dropout={dropout}, targets={len(target_modules)} modules") |
|
|
| model = FastLanguageModel.get_peft_model( |
| model, |
| r=rank, |
| lora_alpha=alpha, |
| lora_dropout=dropout, |
| target_modules=target_modules, |
| bias="none", |
| use_gradient_checkpointing="unsloth", |
| random_state=seed, |
| ) |
|
|
| if torch.cuda.is_available(): |
| vram_used = torch.cuda.memory_allocated() / 1e9 |
| vram_total = torch.cuda.get_device_properties(0).total_mem / 1e9 |
| print(f"[model_utils] VRAM: {vram_used:.2f} / {vram_total:.2f} GiB") |
|
|
| trainable = sum(p.numel() for p in model.parameters() if p.requires_grad) |
| total = sum(p.numel() for p in model.parameters()) |
| print(f"[model_utils] Trainable: {trainable:,} / {total:,} " |
| f"({100 * trainable / total:.2f}%)") |
|
|
| return model |
|
|
|
|
| |
| |
| |
|
|
| def find_latest_checkpoint(hub_repo: str) -> Optional[str]: |
| """Check Hub for the latest checkpoint subfolder. |
| |
| Checkpoints are stored as: <hub_repo>/checkpoint-<step>/ |
| Returns the path to download, or None if no checkpoint exists. |
| """ |
| if not hub_repo: |
| return None |
|
|
| try: |
| api = HfApi() |
| |
| files = api.list_repo_files(hub_repo, repo_type="model") |
| checkpoint_dirs = set() |
| for f in files: |
| |
| parts = f.split("/") |
| if len(parts) >= 2 and parts[0].startswith("checkpoint-"): |
| try: |
| step = int(parts[0].split("-")[1]) |
| checkpoint_dirs.add(step) |
| except (ValueError, IndexError): |
| continue |
|
|
| if not checkpoint_dirs: |
| return None |
|
|
| latest_step = max(checkpoint_dirs) |
| ckpt_path = f"checkpoint-{latest_step}" |
| print(f"[model_utils] Found Hub checkpoint: {hub_repo}/{ckpt_path}") |
| return ckpt_path |
|
|
| except Exception as e: |
| print(f"[model_utils] Could not check Hub for checkpoints: {e}") |
| return None |
|
|
|
|
| def download_checkpoint(hub_repo: str, checkpoint_path: str, |
| local_dir: str = "/tmp/antiatropos_ckpt") -> str: |
| """Download a checkpoint from Hub to local disk. |
| |
| Returns the local path containing adapter files. |
| """ |
| print(f"[model_utils] Downloading checkpoint {hub_repo}/{checkpoint_path}...") |
| snapshot_download( |
| repo_id=hub_repo, |
| repo_type="model", |
| local_dir=local_dir, |
| allow_patterns=[f"{checkpoint_path}/*"], |
| ) |
| return str(Path(local_dir) / checkpoint_path) |
|
|
|
|
| def load_from_checkpoint(model, tokenizer, ckpt_local_path: str): |
| """Load LoRA weights from a local checkpoint directory.""" |
| from peft import PeftModel |
|
|
| print(f"[model_utils] Loading adapter from {ckpt_local_path}") |
| |
| model.load_adapter(ckpt_local_path) |
| return model |
|
|
|
|
| |
| |
| |
|
|
| def save_checkpoint(model, tokenizer, output_dir: str, step: int) -> str: |
| """Save adapter + tokenizer locally. Returns the checkpoint path.""" |
| ckpt_dir = str(Path(output_dir) / f"checkpoint-{step}") |
| Path(ckpt_dir).mkdir(parents=True, exist_ok=True) |
|
|
| model.save_pretrained(ckpt_dir) |
| tokenizer.save_pretrained(ckpt_dir) |
| print(f"[model_utils] Checkpoint saved: {ckpt_dir}") |
| return ckpt_dir |
|
|
|
|
| def push_to_hub(local_dir: str, hub_repo: str, commit_message: str = "") -> None: |
| """Push a local directory to a Hub model repo.""" |
| if not hub_repo: |
| print("[model_utils] No hub_model_repo configured, skipping push") |
| return |
|
|
| try: |
| from huggingface_hub import upload_folder |
|
|
| upload_folder( |
| folder_path=local_dir, |
| repo_id=hub_repo, |
| repo_type="model", |
| commit_message=commit_message or f"Upload from AntiAtropos training", |
| ) |
| print(f"[model_utils] Pushed to {hub_repo}") |
| except Exception as e: |
| print(f"[model_utils] Push failed: {e}") |
|
|
|
|
| def push_adapter_to_hub(model, tokenizer, hub_repo: str, |
| step: int, output_dir: str = "/tmp/antiatropos_final") -> None: |
| """Save final adapter and push to Hub.""" |
| if not hub_repo: |
| print("[model_utils] No hub_model_repo configured, skipping final push") |
| return |
|
|
| Path(output_dir).mkdir(parents=True, exist_ok=True) |
| model.save_pretrained(output_dir) |
| tokenizer.save_pretrained(output_dir) |
| push_to_hub(output_dir, hub_repo, f"AntiAtropos QLoRA step {step}") |
|
|