AntiAtropos / training /model_utils.py
div18
code
e890160
"""
model_utils.py β€” Model loading, LoRA setup, GPU detection, checkpoint resume.
All persistent state lives on Hugging Face Hub.
GPU is ephemeral β€” we only ever write to Hub, never assume local disk survives.
"""
from __future__ import annotations
import os
from pathlib import Path
from typing import Any, Dict, Optional, Tuple
import torch
from huggingface_hub import HfApi, snapshot_download
# ────────────────────────────────────────────────
# GPU Detection
# ────────────────────────────────────────────────
def detect_gpu_tier() -> str:
"""Return 'a100', 'a10g', or 't4' based on GPU name and VRAM."""
if not torch.cuda.is_available():
print("[model_utils] No CUDA detected β€” will be extremely slow")
return "t4"
vram_gb = torch.cuda.get_device_properties(0).total_mem / 1e9
name = torch.cuda.get_device_name(0).lower()
if "a100" in name or vram_gb >= 70:
return "a100"
elif "a10" in name or vram_gb >= 20:
return "a10g"
else:
return "t4"
def gpu_scaled_config(cfg: Dict[str, Any]) -> Dict[str, Any]:
"""Auto-scale LoRA rank and seq_length based on GPU tier.
SRE actions are short (~40 tokens output). The bottleneck is
env API latency, not context length. So we keep seq_len tight
(1024 is plenty) and spend VRAM on rank + episodes instead.
"""
tier = detect_gpu_tier()
overrides: Dict[str, Any] = {}
# seq_len=1024 is sufficient for SRE obs+action (~740 tokens peak)
# Only go higher on A100 where VRAM is abundant
if tier == "a100":
overrides["max_seq_length"] = 2048 # Room for few-shot demos
overrides["lora_rank"] = 48
overrides["lora_alpha"] = 48
overrides["per_device_train_batch_size"] = 4
elif tier == "a10g":
overrides["max_seq_length"] = 1024
overrides["lora_rank"] = 32
overrides["lora_alpha"] = 32
overrides["per_device_train_batch_size"] = 2
else: # t4
overrides["max_seq_length"] = 1024
overrides["lora_rank"] = 16
overrides["lora_alpha"] = 16
overrides["per_device_train_batch_size"] = 1
# Only override if user hasn't explicitly set via env vars
for key, default_val in overrides.items():
env_key = f"ANTIATROPOS_{key.upper()}"
if env_key in os.environ:
val = os.environ[env_key]
# Type conversion
if isinstance(default_val, int):
overrides[key] = int(val)
elif isinstance(default_val, float):
overrides[key] = float(val)
if key in cfg and cfg[key] != overrides[key]:
print(f"[model_utils] GPU {tier}: overriding {key} "
f"{cfg[key]} -> {overrides[key]}")
cfg[key] = overrides[key]
return cfg
# ────────────────────────────────────────────────
# Model Loading
# ────────────────────────────────────────────────
def load_base_model(cfg: Dict[str, Any]):
"""Load base model with Unsloth QLoRA. Returns (model, tokenizer)."""
from unsloth import FastLanguageModel
model_name = cfg["base_model"]
max_seq_length = cfg.get("max_seq_length", 1024)
load_in_4bit = cfg.get("load_in_4bit", True)
print(f"[model_utils] Loading {model_name} "
f"(seq_len={max_seq_length}, 4bit={load_in_4bit})")
model, tokenizer = FastLanguageModel.from_pretrained(
model_name=model_name,
max_seq_length=max_seq_length,
load_in_4bit=load_in_4bit,
dtype=None, # auto-detect bf16/fp16
trust_remote_code=True,
)
return model, tokenizer
def attach_lora(model, cfg: Dict[str, Any], seed: int = 42):
"""Attach LoRA adapters to the base model."""
from unsloth import FastLanguageModel
rank = cfg.get("lora_rank", 32)
alpha = cfg.get("lora_alpha", 32)
dropout = cfg.get("lora_dropout", 0.0)
target_modules = cfg.get("lora_target_modules", [
"q_proj", "k_proj", "v_proj", "o_proj",
"gate_proj", "up_proj", "down_proj",
])
print(f"[model_utils] Attaching LoRA: rank={rank}, alpha={alpha}, "
f"dropout={dropout}, targets={len(target_modules)} modules")
model = FastLanguageModel.get_peft_model(
model,
r=rank,
lora_alpha=alpha,
lora_dropout=dropout,
target_modules=target_modules,
bias="none",
use_gradient_checkpointing="unsloth",
random_state=seed,
)
if torch.cuda.is_available():
vram_used = torch.cuda.memory_allocated() / 1e9
vram_total = torch.cuda.get_device_properties(0).total_mem / 1e9
print(f"[model_utils] VRAM: {vram_used:.2f} / {vram_total:.2f} GiB")
trainable = sum(p.numel() for p in model.parameters() if p.requires_grad)
total = sum(p.numel() for p in model.parameters())
print(f"[model_utils] Trainable: {trainable:,} / {total:,} "
f"({100 * trainable / total:.2f}%)")
return model
# ────────────────────────────────────────────────
# Checkpoint Resume
# ────────────────────────────────────────────────
def find_latest_checkpoint(hub_repo: str) -> Optional[str]:
"""Check Hub for the latest checkpoint subfolder.
Checkpoints are stored as: <hub_repo>/checkpoint-<step>/
Returns the path to download, or None if no checkpoint exists.
"""
if not hub_repo:
return None
try:
api = HfApi()
# List all files in the repo, find checkpoint dirs
files = api.list_repo_files(hub_repo, repo_type="model")
checkpoint_dirs = set()
for f in files:
# checkpoint-123/adapter_model.safetensors
parts = f.split("/")
if len(parts) >= 2 and parts[0].startswith("checkpoint-"):
try:
step = int(parts[0].split("-")[1])
checkpoint_dirs.add(step)
except (ValueError, IndexError):
continue
if not checkpoint_dirs:
return None
latest_step = max(checkpoint_dirs)
ckpt_path = f"checkpoint-{latest_step}"
print(f"[model_utils] Found Hub checkpoint: {hub_repo}/{ckpt_path}")
return ckpt_path
except Exception as e:
print(f"[model_utils] Could not check Hub for checkpoints: {e}")
return None
def download_checkpoint(hub_repo: str, checkpoint_path: str,
local_dir: str = "/tmp/antiatropos_ckpt") -> str:
"""Download a checkpoint from Hub to local disk.
Returns the local path containing adapter files.
"""
print(f"[model_utils] Downloading checkpoint {hub_repo}/{checkpoint_path}...")
snapshot_download(
repo_id=hub_repo,
repo_type="model",
local_dir=local_dir,
allow_patterns=[f"{checkpoint_path}/*"],
)
return str(Path(local_dir) / checkpoint_path)
def load_from_checkpoint(model, tokenizer, ckpt_local_path: str):
"""Load LoRA weights from a local checkpoint directory."""
from peft import PeftModel
print(f"[model_utils] Loading adapter from {ckpt_local_path}")
# For Unsloth models, we reload the adapter
model.load_adapter(ckpt_local_path)
return model
# ────────────────────────────────────────────────
# Save & Push
# ────────────────────────────────────────────────
def save_checkpoint(model, tokenizer, output_dir: str, step: int) -> str:
"""Save adapter + tokenizer locally. Returns the checkpoint path."""
ckpt_dir = str(Path(output_dir) / f"checkpoint-{step}")
Path(ckpt_dir).mkdir(parents=True, exist_ok=True)
model.save_pretrained(ckpt_dir)
tokenizer.save_pretrained(ckpt_dir)
print(f"[model_utils] Checkpoint saved: {ckpt_dir}")
return ckpt_dir
def push_to_hub(local_dir: str, hub_repo: str, commit_message: str = "") -> None:
"""Push a local directory to a Hub model repo."""
if not hub_repo:
print("[model_utils] No hub_model_repo configured, skipping push")
return
try:
from huggingface_hub import upload_folder
upload_folder(
folder_path=local_dir,
repo_id=hub_repo,
repo_type="model",
commit_message=commit_message or f"Upload from AntiAtropos training",
)
print(f"[model_utils] Pushed to {hub_repo}")
except Exception as e:
print(f"[model_utils] Push failed: {e}")
def push_adapter_to_hub(model, tokenizer, hub_repo: str,
step: int, output_dir: str = "/tmp/antiatropos_final") -> None:
"""Save final adapter and push to Hub."""
if not hub_repo:
print("[model_utils] No hub_model_repo configured, skipping final push")
return
Path(output_dir).mkdir(parents=True, exist_ok=True)
model.save_pretrained(output_dir)
tokenizer.save_pretrained(output_dir)
push_to_hub(output_dir, hub_repo, f"AntiAtropos QLoRA step {step}")