vit-beans-v3 / trainer_v5_alpha_cutmix.py
AbstractPhil's picture
Create trainer_v5_alpha_cutmix.py
034ffc8 verified
# train_cantor_fusion_hf.py - PRODUCTION WITH ADAMW + WARM RESTARTS + LR BOOST
"""
Cantor Fusion Classifier with AdamW + Cosine Warm Restarts + LR Boost
----------------------------------------------------------------------
Features:
- AdamW optimizer (best for ViTs)
- CosineAnnealingWarmRestarts with configurable LR boost at restarts
- restart_lr_mult: Multiply LR at restart points for aggressive exploration
- HuggingFace Hub uploads (ONE shared repo, organized by run)
- TensorBoard logging (loss, accuracy, fusion metrics, LR tracking)
- Easy CIFAR-10/100 switching
- Automatic checkpoint management
- SafeTensors format (ClamAV safe)
New Feature: restart_lr_mult
When restart_lr_mult > 1.0, learning rate at restart is BOOSTED:
- Normal: 3e-4 β†’ 1e-7 β†’ restart at 3e-4
- Boosted (1.5x): 3e-4 β†’ 1e-7 β†’ restart at 4.5e-4 β†’ 1e-7
- Creates wider exploration curves to escape solidified local minima
Author: AbstractPhil
License: MIT
"""
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import DataLoader
from torch.utils.tensorboard import SummaryWriter
from torchvision import datasets, transforms
from torch.cuda.amp import autocast, GradScaler
from safetensors.torch import save_file, load_file
import math
import os
import json
from typing import Optional, Dict, List, Tuple, Union
from dataclasses import dataclass, asdict
import time
from pathlib import Path
from tqdm import tqdm
# HuggingFace
from huggingface_hub import HfApi, create_repo, upload_folder, upload_file
import yaml
# Import from your repo
from geovocab2.train.model.layers.attention.cantor_multiheaded_fusion import (
CantorMultiheadFusion,
CantorFusionConfig
)
from geovocab2.shapes.factory.cantor_route_factory import (
CantorRouteFactory,
RouteMode,
SimplexConfig
)
# ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
# Mixing Augmentations (AlphaMix / Fractal AlphaMix)
# ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
def alphamix_data(x, y, alpha_range=(0.3, 0.7), spatial_ratio=0.25):
"""
Standard AlphaMix: Single spatially localized transparent overlay.
Args:
x: Input images [B, C, H, W]
y: Labels [B]
alpha_range: Range for transparency sampling
spatial_ratio: Ratio of image area to overlay
Returns:
composited_x: Mixed images
y_a: Original labels
y_b: Mixed labels
alpha: Effective mixing coefficient
"""
batch_size = x.size(0)
index = torch.randperm(batch_size, device=x.device)
y_a, y_b = y, y[index]
# Sample alpha from Beta distribution
alpha_min, alpha_max = alpha_range
beta_sample = torch.distributions.Beta(2.0, 2.0).sample().item()
alpha = alpha_min + (alpha_max - alpha_min) * beta_sample
# Compute overlay region
_, _, H, W = x.shape
overlay_ratio = torch.sqrt(torch.tensor(spatial_ratio)).item()
overlay_h = int(H * overlay_ratio)
overlay_w = int(W * overlay_ratio)
top = torch.randint(0, H - overlay_h + 1, (1,), device=x.device).item()
left = torch.randint(0, W - overlay_w + 1, (1,), device=x.device).item()
# Blend
composited_x = x.clone()
overlay_region = alpha * x[:, :, top:top+overlay_h, left:left+overlay_w]
background_region = (1 - alpha) * x[index, :, top:top+overlay_h, left:left+overlay_w]
composited_x[:, :, top:top+overlay_h, left:left+overlay_w] = overlay_region + background_region
return composited_x, y_a, y_b, alpha
def alphamix_fractal(
x: torch.Tensor,
y: torch.Tensor,
alpha_range=(0.3, 0.7),
steps_range=(1, 3),
triad_scales=(1/3, 1/9, 1/27),
beta_shape=(2.0, 2.0),
seed: Optional[int] = None,
):
"""
Fractal AlphaMix: Triadic multi-patch overlays aligned to Cantor geometry.
Pure torch, GPU-compatible.
Args:
x: Input images [B, C, H, W]
y: Labels [B]
alpha_range: Range for transparency sampling
steps_range: Range for number of patches to apply
triad_scales: Triadic scales (1/3, 1/9, 1/27 for Cantor-like)
beta_shape: Beta distribution parameters for sampling
seed: Optional random seed
Returns:
x_mix: Mixed images
y_a: Original labels
y_b: Mixed labels
alpha_eff: Effective area-weighted mixing coefficient
"""
if seed is not None:
torch.manual_seed(seed)
B, C, H, W = x.shape
device = x.device
# Permutation for mixing
idx = torch.randperm(B, device=device)
y_a, y_b = y, y[idx]
x_mix = x.clone()
total_area = H * W
# Beta distribution for transparency sampling
k1, k2 = beta_shape
beta_dist = torch.distributions.Beta(k1, k2)
alpha_min, alpha_max = alpha_range
# Storage for effective alpha calculation
alpha_elems = []
area_weights = []
# Sample number of patches (same for all images in batch)
steps = torch.randint(steps_range[0], steps_range[1] + 1, (1,), device=device).item()
for _ in range(steps):
# Choose triadic scale
scale_idx = torch.randint(0, len(triad_scales), (1,), device=device).item()
scale = triad_scales[scale_idx]
# Compute patch dimensions (triadic area)
patch_area = max(1, int(total_area * scale))
side = int(torch.sqrt(torch.tensor(patch_area, dtype=torch.float32)).item())
h = max(1, min(H, side))
w = max(1, min(W, side))
# Random position
top = torch.randint(0, H - h + 1, (1,), device=device).item()
left = torch.randint(0, W - w + 1, (1,), device=device).item()
# Sample transparency from Beta distribution
alpha_raw = beta_dist.sample().item()
alpha = alpha_min + (alpha_max - alpha_min) * alpha_raw
# Track for effective alpha
alpha_elems.append(alpha)
area_weights.append(h * w)
# Blend patches
fg = alpha * x[:, :, top:top + h, left:left + w]
bg = (1 - alpha) * x[idx, :, top:top + h, left:left + w]
x_mix[:, :, top:top + h, left:left + w] = fg + bg
# Compute area-weighted effective alpha
alpha_t = torch.tensor(alpha_elems, dtype=torch.float32, device=device)
area_t = torch.tensor(area_weights, dtype=torch.float32, device=device)
alpha_eff = (alpha_t * area_t).sum() / (area_t.sum() + 1e-12)
alpha_eff = alpha_eff.item()
return x_mix, y_a, y_b, alpha_eff
# ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
# Custom Scheduler with LR Boost at Restarts
# ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
class CosineAnnealingWarmRestartsWithBoost(torch.optim.lr_scheduler._LRScheduler):
"""
Cosine Annealing with Warm Restarts and optional LR boost at restart points.
At each restart, the max LR is multiplied by `restart_lr_mult`, creating
wider exploration curves that can help escape solidified local minima.
Args:
optimizer: Wrapped optimizer
T_0: Number of iterations for the first restart
T_mult: Factor to increase T_i after each restart (default: 1)
eta_min: Minimum learning rate (default: 0)
restart_lr_mult: Multiply max LR by this at each restart (default: 1.0)
Values > 1.0 create boosted exploration cycles
last_epoch: The index of last epoch (default: -1)
Example:
>>> scheduler = CosineAnnealingWarmRestartsWithBoost(
... optimizer, T_0=50, T_mult=2, restart_lr_mult=1.5
... )
# Cycle 1: 3e-4 β†’ 1e-7 (50 epochs)
# Restart: LR jumps to 4.5e-4 (1.5x boost)
# Cycle 2: 4.5e-4 β†’ 1e-7 (100 epochs)
# Restart: LR jumps to 6.75e-4 (1.5x boost again)
# Cycle 3: 6.75e-4 β†’ 1e-7 (200 epochs)
"""
def __init__(
self,
optimizer: torch.optim.Optimizer,
T_0: int,
T_mult: float = 1,
eta_min: float = 0,
restart_lr_mult: float = 1.0,
last_epoch: int = -1
):
if T_0 <= 0 or not isinstance(T_0, int):
raise ValueError(f"Expected positive integer T_0, but got {T_0}")
if T_mult < 1:
raise ValueError(f"Expected T_mult >= 1, but got {T_mult}")
if restart_lr_mult <= 0:
raise ValueError(f"Expected positive restart_lr_mult, but got {restart_lr_mult}")
self.T_0 = T_0
self.T_i = T_0
self.T_mult = T_mult
self.eta_min = eta_min
self.restart_lr_mult = restart_lr_mult
self.T_cur = last_epoch
# Track boosted base LRs and restart count
self.current_base_lrs = None
self.restart_count = 0
super().__init__(optimizer, last_epoch)
def get_lr(self):
if self.T_cur == -1:
# First step - return base LRs
return self.base_lrs
# Use boosted base LRs if we've had restarts
if self.current_base_lrs is None:
base_lrs_to_use = self.base_lrs
else:
base_lrs_to_use = self.current_base_lrs
# Cosine annealing from current base LR to eta_min
return [
self.eta_min + (base_lr - self.eta_min) *
(1 + math.cos(math.pi * self.T_cur / self.T_i)) / 2
for base_lr in base_lrs_to_use
]
def step(self, epoch=None):
if epoch is None and self.last_epoch < 0:
epoch = 0
if epoch is None:
epoch = self.last_epoch + 1
self.T_cur = self.T_cur + 1
# Check if we hit a restart point
if self.T_cur >= self.T_i:
# APPLY BOOST HERE before reset
self.restart_count += 1
if self.current_base_lrs is None:
self.current_base_lrs = list(self.base_lrs)
# Boost the base LRs
self.current_base_lrs = [
base_lr * self.restart_lr_mult
for base_lr in self.current_base_lrs
]
# Now reset cycle
self.T_cur = self.T_cur - self.T_i
self.T_i = int(self.T_i * self.T_mult)
else:
if epoch < 0:
raise ValueError(f"Expected non-negative epoch, but got {epoch}")
if epoch >= self.T_0:
if self.T_mult == 1:
self.T_cur = epoch % self.T_0
# Count how many restarts have occurred
self.restart_count = epoch // self.T_0
else:
n = int(math.log((epoch / self.T_0 * (self.T_mult - 1) + 1), self.T_mult))
self.restart_count = n
self.T_cur = epoch - self.T_0 * (self.T_mult ** n - 1) / (self.T_mult - 1)
self.T_i = self.T_0 * self.T_mult ** n
# Apply cumulative boost
if self.current_base_lrs is None:
self.current_base_lrs = [
base_lr * (self.restart_lr_mult ** self.restart_count)
for base_lr in self.base_lrs
]
else:
self.T_i = self.T_0
self.T_cur = epoch
self.last_epoch = math.floor(epoch)
for param_group, lr in zip(self.optimizer.param_groups, self.get_lr()):
param_group['lr'] = lr
self._last_lr = [group['lr'] for group in self.optimizer.param_groups]
# ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
# Configuration
# ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
@dataclass
class CantorTrainingConfig:
"""Complete configuration for Cantor fusion training with AdamW + Warm Restarts."""
# Dataset
dataset: str = "cifar10" # "cifar10" or "cifar100"
num_classes: int = 10
# Architecture
image_size: int = 32
patch_size: int = 4
embed_dim: int = 384
num_fusion_blocks: int = 6
num_heads: int = 8
fusion_window: int = 32
fusion_mode: str = "weighted" # "weighted" or "consciousness"
k_simplex: int = 4
use_beatrix: bool = False
beatrix_tau: float = 0.25
# Optimization
precompute_geometric: bool = True
use_torch_compile: bool = True
use_mixed_precision: bool = False
# Regularization
dropout: float = 0.1
drop_path_rate: float = 0.1
label_smoothing: float = 0.1
# Training - Optimizer (AdamW)
optimizer_type: str = "adamw" # "sgd" or "adamw"
batch_size: int = 128
num_epochs: int = 300
learning_rate: float = 3e-4 # AdamW default
weight_decay: float = 0.05
grad_clip: float = 1.0
# SGD-specific (if needed)
sgd_momentum: float = 0.9
sgd_nesterov: bool = True
# AdamW-specific
adamw_betas: Tuple[float, float] = (0.9, 0.999)
adamw_eps: float = 1e-8
# Learning rate schedule - WARM RESTARTS WITH BOOST
scheduler_type: str = "cosine_restarts" # "multistep", "cosine", "cosine_restarts"
# CosineAnnealingWarmRestarts parameters
restart_period: int = 50 # T_0: epochs until first restart
restart_mult: float = 2.0 # T_mult: multiply period after each restart (can be float like 1.5)
restart_lr_mult: float = 1.0 # NEW: LR multiplier at restarts (>1.0 for boosted exploration)
min_lr: float = 1e-7 # eta_min: minimum learning rate
# MultiStepLR (for SGD fallback)
lr_milestones: List[int] = None
lr_gamma: float = 0.2
# Cosine annealing (regular, no restarts)
warmup_epochs: int = 0
# Data augmentation
use_augmentation: bool = True
use_autoaugment: bool = True
use_cutout: bool = False
cutout_length: int = 16
# Mixing augmentation (AlphaMix / Fractal AlphaMix)
use_mixing: bool = False
mixing_type: str = "alphamix" # "alphamix" or "fractal"
mixing_alpha_range: Tuple[float, float] = (0.3, 0.7)
mixing_spatial_ratio: float = 0.25 # For standard alphamix
mixing_prob: float = 1.0 # Probability of applying mixing
# Fractal AlphaMix specific
fractal_steps_range: Tuple[int, int] = (1, 3)
fractal_triad_scales: Tuple[float, ...] = (1/3, 1/9, 1/27)
# System
device: str = "cuda" if torch.cuda.is_available() else "cpu"
num_workers: int = 8
seed: int = 42
# Paths
weights_dir: str = "weights"
model_name: str = "vit-beans-v3"
run_name: Optional[str] = None # Auto-generated if None
# HuggingFace - ONE SHARED REPO
hf_username: str = "AbstractPhil"
hf_repo_name: Optional[str] = None
upload_to_hf: bool = True
hf_token: Optional[str] = None
# Logging
log_interval: int = 50
save_interval: int = 10
checkpoint_upload_interval: int = 20
def __post_init__(self):
# Auto-set num_classes based on dataset
if self.dataset == "cifar10":
self.num_classes = 10
elif self.dataset == "cifar100":
self.num_classes = 100
else:
raise ValueError(f"Unknown dataset: {self.dataset}")
# Set default milestones if None (for multistep fallback)
if self.lr_milestones is None:
if self.num_epochs >= 200:
self.lr_milestones = [60, 120, 160]
elif self.num_epochs >= 100:
self.lr_milestones = [30, 60, 80]
else:
self.lr_milestones = [
int(self.num_epochs * 0.5),
int(self.num_epochs * 0.75)
]
# Auto-generate run name
if self.run_name is None:
timestamp = time.strftime("%Y%m%d_%H%M%S")
opt_name = self.optimizer_type.upper()
sched_name = "WarmRestart" if self.scheduler_type == "cosine_restarts" else self.scheduler_type
boost_str = f"_boost{self.restart_lr_mult}x" if self.restart_lr_mult > 1.0 else ""
self.run_name = f"{self.dataset}_{self.fusion_mode}_{opt_name}_{sched_name}{boost_str}_{timestamp}"
# ONE SHARED REPO for all runs
if self.hf_repo_name is None:
self.hf_repo_name = self.model_name
# Set HF token from environment if not provided
if self.hf_token is None:
self.hf_token = os.environ.get("HF_TOKEN")
# Calculate derived values
assert self.image_size % self.patch_size == 0
self.num_patches = (self.image_size // self.patch_size) ** 2
self.patch_dim = self.patch_size * self.patch_size * 3
# Create paths
self.output_dir = Path(self.weights_dir) / self.model_name / self.run_name
self.checkpoint_dir = self.output_dir / "checkpoints"
self.tensorboard_dir = self.output_dir / "tensorboard"
# Create directories
self.output_dir.mkdir(parents=True, exist_ok=True)
self.checkpoint_dir.mkdir(parents=True, exist_ok=True)
self.tensorboard_dir.mkdir(parents=True, exist_ok=True)
def save(self, path: Union[str, Path]):
"""Save config to YAML file."""
path = Path(path)
config_dict = asdict(self)
# Convert tuples to lists for YAML
if 'adamw_betas' in config_dict:
config_dict['adamw_betas'] = list(config_dict['adamw_betas'])
with open(path, 'w') as f:
yaml.dump(config_dict, f, default_flow_style=False)
@classmethod
def load(cls, path: Union[str, Path]):
"""Load config from YAML file."""
path = Path(path)
with open(path, 'r') as f:
config_dict = yaml.safe_load(f)
# Convert lists back to tuples
if 'adamw_betas' in config_dict:
config_dict['adamw_betas'] = tuple(config_dict['adamw_betas'])
return cls(**config_dict)
# ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
# Model Components (unchanged from previous version)
# ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
class PatchEmbedding(nn.Module):
"""Patch embedding layer."""
def __init__(self, config: CantorTrainingConfig):
super().__init__()
self.config = config
self.proj = nn.Conv2d(3, config.embed_dim, kernel_size=config.patch_size, stride=config.patch_size)
self.pos_embed = nn.Parameter(torch.randn(1, config.num_patches, config.embed_dim) * 0.02)
def forward(self, x: torch.Tensor) -> torch.Tensor:
x = self.proj(x)
x = x.flatten(2).transpose(1, 2)
x = x + self.pos_embed
return x
class DropPath(nn.Module):
"""Stochastic depth."""
def __init__(self, drop_prob: float = 0.0):
super().__init__()
self.drop_prob = drop_prob
def forward(self, x):
if self.drop_prob == 0. or not self.training:
return x
keep_prob = 1 - self.drop_prob
shape = (x.shape[0],) + (1,) * (x.ndim - 1)
random_tensor = keep_prob + torch.rand(shape, dtype=x.dtype, device=x.device)
random_tensor.floor_()
return x.div(keep_prob) * random_tensor
class CantorFusionBlock(nn.Module):
"""Cantor fusion block."""
def __init__(self, config: CantorTrainingConfig, drop_path: float = 0.0):
super().__init__()
self.norm1 = nn.LayerNorm(config.embed_dim)
fusion_config = CantorFusionConfig(
dim=config.embed_dim,
num_heads=config.num_heads,
fusion_window=config.fusion_window,
fusion_mode=config.fusion_mode,
k_simplex=config.k_simplex,
use_beatrix_routing=config.use_beatrix,
use_consciousness_weighting=(config.fusion_mode == "consciousness"),
beatrix_tau=config.beatrix_tau,
use_gating=True,
dropout=config.dropout,
residual=False,
precompute_staircase=config.precompute_geometric,
precompute_routes=config.precompute_geometric,
precompute_distances=config.precompute_geometric,
use_optimized_gather=True,
staircase_cache_sizes=[config.num_patches],
use_torch_compile=config.use_torch_compile
)
self.fusion = CantorMultiheadFusion(fusion_config)
self.norm2 = nn.LayerNorm(config.embed_dim)
mlp_hidden = config.embed_dim * 4
self.mlp = nn.Sequential(
nn.Linear(config.embed_dim, mlp_hidden),
nn.GELU(),
nn.Dropout(config.dropout),
nn.Linear(mlp_hidden, config.embed_dim),
nn.Dropout(config.dropout)
)
self.drop_path = DropPath(drop_path) if drop_path > 0 else nn.Identity()
def forward(self, x: torch.Tensor, return_fusion_info: bool = False) -> Union[torch.Tensor, Tuple[torch.Tensor, Dict]]:
fusion_result = self.fusion(self.norm1(x))
x = x + self.drop_path(fusion_result['output'])
x = x + self.drop_path(self.mlp(self.norm2(x)))
if return_fusion_info:
fusion_info = {
'consciousness': fusion_result.get('consciousness'),
'cantor_measure': fusion_result.get('cantor_measure')
}
return x, fusion_info
return x
class CantorClassifier(nn.Module):
"""Cantor fusion classifier."""
def __init__(self, config: CantorTrainingConfig):
super().__init__()
self.config = config
self.patch_embed = PatchEmbedding(config)
dpr = [x.item() for x in torch.linspace(0, config.drop_path_rate, config.num_fusion_blocks)]
self.blocks = nn.ModuleList([
CantorFusionBlock(config, drop_path=dpr[i])
for i in range(config.num_fusion_blocks)
])
self.norm = nn.LayerNorm(config.embed_dim)
self.head = nn.Linear(config.embed_dim, config.num_classes)
self.apply(self._init_weights)
def _init_weights(self, m):
if isinstance(m, nn.Linear):
nn.init.trunc_normal_(m.weight, std=0.02)
if m.bias is not None:
nn.init.constant_(m.bias, 0)
elif isinstance(m, nn.LayerNorm):
nn.init.constant_(m.bias, 0)
nn.init.constant_(m.weight, 1.0)
elif isinstance(m, nn.Conv2d):
nn.init.kaiming_normal_(m.weight, mode='fan_out', nonlinearity='relu')
def forward(self, x: torch.Tensor, return_fusion_info: bool = False) -> Union[torch.Tensor, Tuple[torch.Tensor, List[Dict]]]:
x = self.patch_embed(x)
fusion_infos = []
for i, block in enumerate(self.blocks):
if return_fusion_info and i == len(self.blocks) - 1:
x, fusion_info = block(x, return_fusion_info=True)
fusion_infos.append(fusion_info)
else:
x = block(x)
x = self.norm(x)
x = x.mean(dim=1)
logits = self.head(x)
if return_fusion_info:
return logits, fusion_infos
return logits
# ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
# HuggingFace Integration
# ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
class HuggingFaceUploader:
"""Manages HuggingFace Hub uploads to ONE shared repo."""
def __init__(self, config: CantorTrainingConfig):
self.config = config
self.api = HfApi(token=config.hf_token) if config.upload_to_hf else None
self.repo_id = f"{config.hf_username}/{config.hf_repo_name}"
self.run_prefix = f"runs/{config.run_name}"
if config.upload_to_hf:
self._create_repo()
self._update_main_readme()
def _create_repo(self):
"""Create HuggingFace repo if it doesn't exist."""
try:
create_repo(
repo_id=self.repo_id,
token=self.config.hf_token,
exist_ok=True,
private=False
)
print(f"[HF] Repository: https://huggingface.co/{self.repo_id}")
print(f"[HF] Run folder: {self.run_prefix}")
except Exception as e:
print(f"[HF] Warning: Could not create repo: {e}")
def _update_main_readme(self):
"""Create or update the main shared README at repo root."""
if not self.config.upload_to_hf or self.api is None:
return
boost_info = ""
if self.config.restart_lr_mult > 1.0:
boost_info = f"""
### πŸš€ LR Boost at Restarts (NEW!)
This run uses **restart_lr_mult = {self.config.restart_lr_mult}x**:
- Normal restart: 3e-4 β†’ 1e-7 β†’ restart at 3e-4
- **Boosted restart**: 3e-4 β†’ 1e-7 β†’ restart at {self.config.learning_rate * self.config.restart_lr_mult:.2e} ({self.config.restart_lr_mult}x!)
- Creates **wider exploration curves** to escape solidified local minima
- Each restart provides progressively stronger exploration boost
"""
main_readme = f"""---
tags:
- image-classification
- cantor-fusion
- geometric-deep-learning
- safetensors
- vision-transformer
- warm-restarts
library_name: pytorch
datasets:
- cifar10
- cifar100
metrics:
- accuracy
---
# {self.config.hf_repo_name}
**Geometric Deep Learning with Cantor Multihead Fusion + AdamW Warm Restarts**
This repository contains multiple training runs using Cantor fusion architecture with pentachoron structures, geometric routing, and **CosineAnnealingWarmRestarts** for automatic exploration cycles.
## Training Strategy: AdamW + Warm Restarts
This model uses **AdamW with Cosine Annealing Warm Restarts** (SGDR):
- **Drop phase**: LR decays from {self.config.learning_rate} β†’ {self.config.min_lr} over {self.config.restart_period} epochs
- **Restart phase**: LR jumps back to {self.config.learning_rate} to explore new regions
- **Cycle multiplier**: Each cycle is {self.config.restart_mult}x longer than previous
- **Benefits**: Automatic exploration + exploitation, finds better minima, robust training
{boost_info}
### Restart Schedule
```
Epochs 0-{self.config.restart_period}: LR: {self.config.learning_rate} β†’ {self.config.min_lr} (first cycle)
Epoch {self.config.restart_period}: LR: RESTART to {self.config.learning_rate * self.config.restart_lr_mult if self.config.restart_lr_mult > 1.0 else self.config.learning_rate} πŸ”„
Epochs {self.config.restart_period}-{self.config.restart_period * (1 + self.config.restart_mult)}: LR: {self.config.learning_rate * self.config.restart_lr_mult if self.config.restart_lr_mult > 1.0 else self.config.learning_rate} β†’ {self.config.min_lr} (longer cycle)
...
```
## Current Run
**Latest**: `{self.config.run_name}`
- **Dataset**: {self.config.dataset.upper()}
- **Fusion Mode**: {self.config.fusion_mode}
- **Optimizer**: AdamW (adaptive moments)
- **Scheduler**: CosineAnnealingWarmRestarts
- **Restart LR Mult**: {self.config.restart_lr_mult}x
- **Architecture**: {self.config.num_fusion_blocks} blocks, {self.config.num_heads} heads
- **Simplex**: {self.config.k_simplex}-simplex ({self.config.k_simplex + 1} vertices)
## Architecture
The Cantor Fusion architecture uses:
- **Geometric Routing**: Pentachoron (5-simplex) structures for token routing
- **Cantor Multihead Fusion**: Multiple fusion heads with geometric attention
- **Beatrix Consciousness Routing**: Optional consciousness-aware token fusion
- **SafeTensors Format**: All model weights use SafeTensors (not pickle)
## Usage
```python
from huggingface_hub import hf_hub_download
from safetensors.torch import load_file
model_path = hf_hub_download(
repo_id="{self.repo_id}",
filename="runs/YOUR_RUN_NAME/checkpoints/best_model.safetensors"
)
state_dict = load_file(model_path)
model.load_state_dict(state_dict)
```
## Citation
```bibtex
@misc{{{self.config.hf_repo_name.replace('-', '_')},
author = {{AbstractPhil}},
title = {{{self.config.hf_repo_name}: Geometric Deep Learning with Warm Restarts}},
year = {{2025}},
publisher = {{HuggingFace}},
url = {{https://huggingface.co/{self.repo_id}}}
}}
```
---
**Repository maintained by**: [@{self.config.hf_username}](https://huggingface.co/{self.config.hf_username})
**Latest update**: {time.strftime("%Y-%m-%d %H:%M:%S")}
"""
main_readme_path = Path(self.config.weights_dir) / self.config.model_name / "MAIN_README.md"
main_readme_path.parent.mkdir(parents=True, exist_ok=True)
with open(main_readme_path, 'w') as f:
f.write(main_readme)
try:
upload_file(
path_or_fileobj=str(main_readme_path),
path_in_repo="README.md",
repo_id=self.repo_id,
token=self.config.hf_token
)
print(f"[HF] Updated main README")
except Exception as e:
print(f"[HF] Main README upload failed: {e}")
def upload_file(self, file_path: Path, repo_path: str):
"""Upload single file to HuggingFace."""
if not self.config.upload_to_hf or self.api is None:
return
try:
if not repo_path.startswith(self.run_prefix) and not repo_path.startswith("runs/"):
full_path = f"{self.run_prefix}/{repo_path}"
else:
full_path = repo_path
upload_file(
path_or_fileobj=str(file_path),
path_in_repo=full_path,
repo_id=self.repo_id,
token=self.config.hf_token
)
print(f"[HF] βœ“ Uploaded: {full_path}")
except Exception as e:
print(f"[HF] βœ— Upload failed ({full_path}): {e}")
def upload_folder_contents(self, folder_path: Path, repo_folder: str):
"""Upload entire folder to HuggingFace."""
if not self.config.upload_to_hf or self.api is None:
return
try:
full_path = f"{self.run_prefix}/{repo_folder}"
upload_folder(
folder_path=str(folder_path),
repo_id=self.repo_id,
path_in_repo=full_path,
token=self.config.hf_token,
ignore_patterns=["*.pyc", "__pycache__"]
)
print(f"[HF] Uploaded folder: {full_path}")
except Exception as e:
print(f"[HF] Folder upload failed: {e}")
def create_model_card(self, trainer_stats: Dict):
"""Create and upload run-specific model card."""
if not self.config.upload_to_hf:
return
boost_section = ""
if self.config.restart_lr_mult > 1.0:
boost_section = f"""
### πŸš€ LR Boost Feature
This run uses **restart_lr_mult = {self.config.restart_lr_mult}x** for aggressive exploration:
**How it works:**
```
Cycle 1: {self.config.learning_rate:.2e} β†’ {self.config.min_lr:.2e} (standard convergence)
Restart: β†’ {self.config.learning_rate * self.config.restart_lr_mult:.2e} (BOOSTED!)
Cycle 2: {self.config.learning_rate * self.config.restart_lr_mult:.2e} β†’ {self.config.min_lr:.2e} (wider exploration)
Restart: β†’ {self.config.learning_rate * (self.config.restart_lr_mult ** 2):.2e} (EVEN MORE BOOSTED!)
Cycle 3: {self.config.learning_rate * (self.config.restart_lr_mult ** 2):.2e} β†’ {self.config.min_lr:.2e}
...
```
**Benefits:**
- πŸ”“ **Escape solidified local minima** with aggressive LR spikes
- 🌊 **Wider exploration curves** after each restart
- πŸ’ͺ **Progressively stronger exploration** as training proceeds
- 🎯 **Combat training plateaus** that plague long runs
"""
run_card = f"""# Run: {self.config.run_name}
## Configuration
- **Dataset**: {self.config.dataset.upper()}
- **Fusion Mode**: {self.config.fusion_mode}
- **Parameters**: {trainer_stats['total_params']:,}
- **Simplex**: {self.config.k_simplex}-simplex ({self.config.k_simplex + 1} vertices)
## Performance
- **Best Validation Accuracy**: {trainer_stats['best_acc']:.2f}%
- **Training Time**: {trainer_stats['training_time']:.1f} hours
- **Final Epoch**: {trainer_stats['final_epoch']}
## Training Setup: AdamW + Warm Restarts
- **Optimizer**: AdamW (lr={self.config.learning_rate}, wd={self.config.weight_decay})
- **Scheduler**: CosineAnnealingWarmRestarts
- **Restart Period (T_0)**: {self.config.restart_period} epochs
- **Cycle Multiplier (T_mult)**: {self.config.restart_mult}x
- **Restart LR Mult**: {self.config.restart_lr_mult}x {'πŸš€' if self.config.restart_lr_mult > 1.0 else ''}
- **Min LR**: {self.config.min_lr}
- **Batch Size**: {self.config.batch_size}
- **Mixed Precision**: {trainer_stats.get('mixed_precision', False)}
{boost_section}
### Learning Rate Schedule
```
Cycle 1: Epochs 0-{self.config.restart_period}
LR: {self.config.learning_rate} β†’ {self.config.min_lr} (drop)
Expected: Convergence to local minimum
Epoch {self.config.restart_period}: RESTART πŸ”„
LR: {self.config.min_lr} β†’ {self.config.learning_rate * self.config.restart_lr_mult if self.config.restart_lr_mult > 1.0 else self.config.learning_rate} (jump{"!" if self.config.restart_lr_mult > 1.0 else ""})
Expected: Escape local minimum, explore new regions
Cycle 2: Epochs {self.config.restart_period}-{self.config.restart_period * (1 + self.config.restart_mult)}
LR: {self.config.learning_rate * self.config.restart_lr_mult if self.config.restart_lr_mult > 1.0 else self.config.learning_rate} β†’ {self.config.min_lr} (longer cycle)
Expected: Deeper convergence
... and so on
```
## Files
- `{self.run_prefix}/checkpoints/best_model.safetensors` - Model weights
- `{self.run_prefix}/checkpoints/best_training_state.pt` - Optimizer state
- `{self.run_prefix}/config.yaml` - Full configuration
- `{self.run_prefix}/tensorboard/` - TensorBoard logs (LR tracking!)
## Usage
```python
from safetensors.torch import load_file
from huggingface_hub import hf_hub_download
model_path = hf_hub_download(
repo_id="{self.repo_id}",
filename="{self.run_prefix}/checkpoints/best_model.safetensors"
)
state_dict = load_file(model_path)
model.load_state_dict(state_dict)
```
## Training Notes
**Warm Restarts Benefits:**
- πŸ”„ **Exploration**: Periodic LR jumps escape local minima
- πŸ“‰ **Exploitation**: Long drop phases converge deeply
- 🎯 **Robustness**: Multiple restarts find better solutions
- πŸ“Š **Monitoring**: Watch TensorBoard for restart effects!
**Expected Behavior:**
- Accuracy improves during each drop phase
- Brief accuracy dips after restarts (exploration)
- Overall upward trend across cycles
- Best models often found late in long cycles
---
Built with geometric consciousness-aware routing using the Devil's Staircase (Beatrix) and pentachoron parameterization.
**Training completed**: {time.strftime("%Y-%m-%d %H:%M:%S")}
[← Back to main repository](https://huggingface.co/{self.repo_id})
"""
readme_path = self.config.output_dir / "RUN_README.md"
with open(readme_path, 'w') as f:
f.write(run_card)
try:
upload_file(
path_or_fileobj=str(readme_path),
path_in_repo=f"{self.run_prefix}/README.md",
repo_id=self.repo_id,
token=self.config.hf_token
)
print(f"[HF] Uploaded run README")
except Exception as e:
print(f"[HF] Run README upload failed: {e}")
# ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
# Trainer with AdamW + CosineAnnealingWarmRestarts + LR Boost
# ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
class Trainer:
"""Training manager with AdamW + Warm Restarts + LR Boost."""
def __init__(self, config: CantorTrainingConfig):
self.config = config
self.device = torch.device(config.device)
# Set seed
torch.manual_seed(config.seed)
if torch.cuda.is_available():
torch.cuda.manual_seed(config.seed)
# Model
print("\n" + "=" * 70)
print(f"Initializing Cantor Classifier - {config.dataset.upper()}")
print("=" * 70)
init_start = time.time()
self.model = CantorClassifier(config).to(self.device)
init_time = time.time() - init_start
print(f"\n[Model] Initialization time: {init_time:.2f}s")
self.print_model_info()
# Track restart epochs for logging
self.restart_epochs = self._calculate_restart_epochs()
# Optimizer
self.optimizer = self.create_optimizer()
# Scheduler
self.scheduler = self.create_scheduler()
# Loss
self.criterion = nn.CrossEntropyLoss(label_smoothing=config.label_smoothing)
# Mixing info
self.use_mixing = config.use_mixing
self.mixing_type = config.mixing_type
self.mixing_prob = config.mixing_prob
# Mixed precision
self.use_amp = config.use_mixed_precision and config.device == "cuda"
self.scaler = GradScaler() if self.use_amp else None
if self.use_amp:
print(f"[Training] Mixed precision enabled")
# TensorBoard
self.writer = SummaryWriter(log_dir=str(config.tensorboard_dir))
print(f"[TensorBoard] Logging to: {config.tensorboard_dir}")
print(f"[Checkpoints] Format: SafeTensors (ClamAV safe)")
# HuggingFace
self.hf_uploader = HuggingFaceUploader(config) if config.upload_to_hf else None
# Save config
config.save(config.output_dir / "config.yaml")
# Metrics
self.best_acc = 0.0
self.global_step = 0
self.start_time = time.time()
self.upload_count = 0
def apply_mixing(self, images: torch.Tensor, labels: torch.Tensor):
"""Apply mixing augmentation if enabled."""
if not self.use_mixing or torch.rand(1).item() > self.mixing_prob:
return images, labels, None
if self.mixing_type == "alphamix":
mixed_images, y_a, y_b, alpha = alphamix_data(
images, labels,
alpha_range=self.config.mixing_alpha_range,
spatial_ratio=self.config.mixing_spatial_ratio
)
elif self.mixing_type == "fractal":
mixed_images, y_a, y_b, alpha = alphamix_fractal(
images, labels,
alpha_range=self.config.mixing_alpha_range,
steps_range=self.config.fractal_steps_range,
triad_scales=self.config.fractal_triad_scales
)
else:
raise ValueError(f"Unknown mixing type: {self.mixing_type}")
return mixed_images, (y_a, y_b, alpha), alpha
def compute_mixed_loss(self, logits: torch.Tensor, mixed_labels):
"""Compute loss for mixed labels."""
if mixed_labels is None:
# No mixing applied
return None
y_a, y_b, alpha = mixed_labels
loss_a = self.criterion(logits, y_a)
loss_b = self.criterion(logits, y_b)
# Weighted combination based on mixing ratio
# Use spatial_ratio for weighting (alpha represents transparency)
loss = alpha * loss_a + (1 - alpha) * loss_b
return loss
def _calculate_restart_epochs(self) -> List[int]:
"""Calculate when restarts will occur."""
if self.config.scheduler_type != "cosine_restarts":
return []
restarts = []
current = self.config.restart_period
period = self.config.restart_period
while current < self.config.num_epochs:
restarts.append(current)
period *= self.config.restart_mult
current += period
return restarts
def create_optimizer(self):
"""Create optimizer based on config."""
if self.config.optimizer_type == "sgd":
print(f"\n[Optimizer] SGD")
print(f" LR: {self.config.learning_rate}")
print(f" Momentum: {self.config.sgd_momentum}")
print(f" Nesterov: {self.config.sgd_nesterov}")
print(f" Weight decay: {self.config.weight_decay}")
return torch.optim.SGD(
self.model.parameters(),
lr=self.config.learning_rate,
momentum=self.config.sgd_momentum,
weight_decay=self.config.weight_decay,
nesterov=self.config.sgd_nesterov
)
elif self.config.optimizer_type == "adamw":
print(f"\n[Optimizer] AdamW")
print(f" LR: {self.config.learning_rate}")
print(f" Betas: {self.config.adamw_betas}")
print(f" Weight decay: {self.config.weight_decay}")
return torch.optim.AdamW(
self.model.parameters(),
lr=self.config.learning_rate,
betas=self.config.adamw_betas,
eps=self.config.adamw_eps,
weight_decay=self.config.weight_decay
)
else:
raise ValueError(f"Unknown optimizer: {self.config.optimizer_type}")
def create_scheduler(self):
"""Create LR scheduler based on config."""
if self.config.scheduler_type == "cosine_restarts":
print(f"\n[Scheduler] CosineAnnealingWarmRestarts with LR Boost")
print(f" T_0 (restart period): {self.config.restart_period} epochs")
print(f" T_mult (cycle multiplier): {self.config.restart_mult}x")
print(f" Restart LR mult: {self.config.restart_lr_mult}x {'πŸš€' if self.config.restart_lr_mult > 1.0 else ''}")
print(f" Min LR: {self.config.min_lr}")
if self.config.restart_lr_mult > 1.0:
print(f"\n πŸš€ BOOST MODE ENABLED!")
print(f" Baseline LR: {self.config.learning_rate:.2e}")
boosted_lrs = [self.config.learning_rate * (self.config.restart_lr_mult ** i) for i in range(1, min(4, len(self.restart_epochs) + 1))]
for i, lr in enumerate(boosted_lrs):
print(f" After restart #{i+1}: {lr:.2e} ({self.config.restart_lr_mult**(i+1):.2f}x)")
print(f" β†’ Creates wider exploration curves to escape local minima!")
print(f"\n Restart schedule:")
for i, epoch in enumerate(self.restart_epochs[:5]): # Show first 5
mult = self.config.restart_lr_mult ** (i + 1) if self.config.restart_lr_mult > 1.0 else 1.0
print(f" Restart #{i+1}: Epoch {epoch} (LR: {self.config.learning_rate * mult:.2e})")
if len(self.restart_epochs) > 5:
print(f" ... and {len(self.restart_epochs) - 5} more")
return CosineAnnealingWarmRestartsWithBoost(
self.optimizer,
T_0=self.config.restart_period,
T_mult=self.config.restart_mult,
eta_min=self.config.min_lr,
restart_lr_mult=self.config.restart_lr_mult
)
elif self.config.scheduler_type == "multistep":
print(f"\n[Scheduler] MultiStepLR")
print(f" Milestones: {self.config.lr_milestones}")
print(f" Gamma: {self.config.lr_gamma}")
return torch.optim.lr_scheduler.MultiStepLR(
self.optimizer,
milestones=self.config.lr_milestones,
gamma=self.config.lr_gamma
)
elif self.config.scheduler_type == "cosine":
print(f"\n[Scheduler] Cosine annealing with warmup")
print(f" Warmup epochs: {self.config.warmup_epochs}")
print(f" Min LR: {self.config.min_lr}")
def lr_lambda(epoch):
if epoch < self.config.warmup_epochs:
return (epoch + 1) / self.config.warmup_epochs
progress = (epoch - self.config.warmup_epochs) / (self.config.num_epochs - self.config.warmup_epochs)
return 0.5 * (1 + math.cos(math.pi * progress))
return torch.optim.lr_scheduler.LambdaLR(self.optimizer, lr_lambda)
else:
raise ValueError(f"Unknown scheduler: {self.config.scheduler_type}")
def print_model_info(self):
"""Print model info."""
total_params = sum(p.numel() for p in self.model.parameters())
print(f"\nParameters: {total_params:,}")
print(f"Dataset: {self.config.dataset.upper()}")
print(f"Classes: {self.config.num_classes}")
print(f"Fusion mode: {self.config.fusion_mode}")
print(f"Optimizer: {self.config.optimizer_type.upper()}")
print(f"Scheduler: {self.config.scheduler_type}")
if self.config.restart_lr_mult > 1.0:
print(f"LR Boost: {self.config.restart_lr_mult}x at restarts πŸš€")
if self.config.use_mixing:
print(f"Mixing: {self.config.mixing_type} (prob={self.config.mixing_prob})")
print(f"Output: {self.config.output_dir}")
def train_epoch(self, train_loader: DataLoader, epoch: int) -> Tuple[float, float]:
"""Train one epoch."""
self.model.train()
total_loss, correct, total = 0.0, 0, 0
mixing_applied_count = 0
total_batches = 0
# Check if this is a restart epoch
is_restart = (epoch in self.restart_epochs)
epoch_desc = f"Epoch {epoch+1}/{self.config.num_epochs}"
if is_restart:
restart_num = self.restart_epochs.index(epoch) + 1
boost_mult = self.config.restart_lr_mult ** restart_num if self.config.restart_lr_mult > 1.0 else 1.0
epoch_desc += f" πŸ”„ RESTART #{restart_num}"
if self.config.restart_lr_mult > 1.0:
epoch_desc += f" ({boost_mult:.2f}x)"
pbar = tqdm(train_loader, desc=f"{epoch_desc} [Train]")
for batch_idx, (images, labels) in enumerate(pbar):
images, labels = images.to(self.device, non_blocking=True), labels.to(self.device, non_blocking=True)
# Apply mixing augmentation
original_labels = labels
mixed_images, mixed_labels_info, mixing_alpha = self.apply_mixing(images, labels)
if mixing_alpha is not None:
mixing_applied_count += 1
images = mixed_images
total_batches += 1
# Forward
if self.use_amp:
with autocast():
logits = self.model(images)
# Compute loss (handle mixed labels)
if mixing_alpha is not None:
loss = self.compute_mixed_loss(logits, mixed_labels_info)
else:
loss = self.criterion(logits, labels)
self.optimizer.zero_grad(set_to_none=True)
self.scaler.scale(loss).backward()
self.scaler.unscale_(self.optimizer)
torch.nn.utils.clip_grad_norm_(self.model.parameters(), self.config.grad_clip)
self.scaler.step(self.optimizer)
self.scaler.update()
else:
logits = self.model(images)
# Compute loss (handle mixed labels)
if mixing_alpha is not None:
loss = self.compute_mixed_loss(logits, mixed_labels_info)
else:
loss = self.criterion(logits, labels)
self.optimizer.zero_grad(set_to_none=True)
loss.backward()
torch.nn.utils.clip_grad_norm_(self.model.parameters(), self.config.grad_clip)
self.optimizer.step()
# Metrics (use original labels for accuracy)
total_loss += loss.item()
_, predicted = logits.max(1)
correct += predicted.eq(original_labels).sum().item()
total += original_labels.size(0)
# TensorBoard logging
if batch_idx % self.config.log_interval == 0:
current_lr = self.scheduler.get_last_lr()[0]
self.writer.add_scalar('train/loss', loss.item(), self.global_step)
self.writer.add_scalar('train/accuracy', 100. * correct / total, self.global_step)
self.writer.add_scalar('train/learning_rate', current_lr, self.global_step)
if mixing_alpha is not None:
self.writer.add_scalar('train/mixing_alpha', mixing_alpha, self.global_step)
self.global_step += 1
postfix_dict = {
'loss': f'{loss.item():.4f}',
'acc': f'{100. * correct / total:.2f}%',
'lr': f'{self.scheduler.get_last_lr()[0]:.6f}'
}
if self.use_mixing:
mix_pct = 100.0 * mixing_applied_count / total_batches
postfix_dict['mix'] = f'{mix_pct:.0f}%'
pbar.set_postfix(postfix_dict)
return total_loss / len(train_loader), 100. * correct / total
@torch.no_grad()
def evaluate(self, val_loader: DataLoader, epoch: int) -> Tuple[float, Dict]:
"""Evaluate."""
self.model.eval()
total_loss, correct, total = 0.0, 0, 0
consciousness_values = []
pbar = tqdm(val_loader, desc=f"Epoch {epoch+1}/{self.config.num_epochs} [Val] ")
for batch_idx, (images, labels) in enumerate(pbar):
images, labels = images.to(self.device, non_blocking=True), labels.to(self.device, non_blocking=True)
# Forward with fusion info on last batch
return_info = (batch_idx == len(val_loader) - 1)
if self.use_amp:
with autocast():
if return_info:
logits, fusion_infos = self.model(images, return_fusion_info=True)
if fusion_infos and fusion_infos[0].get('consciousness') is not None:
consciousness_values.append(fusion_infos[0]['consciousness'].mean().item())
else:
logits = self.model(images)
loss = self.criterion(logits, labels)
else:
if return_info:
logits, fusion_infos = self.model(images, return_fusion_info=True)
if fusion_infos and fusion_infos[0].get('consciousness') is not None:
consciousness_values.append(fusion_infos[0]['consciousness'].mean().item())
else:
logits = self.model(images)
loss = self.criterion(logits, labels)
total_loss += loss.item()
_, predicted = logits.max(1)
correct += predicted.eq(labels).sum().item()
total += labels.size(0)
pbar.set_postfix({
'loss': f'{total_loss / (batch_idx + 1):.4f}',
'acc': f'{100. * correct / total:.2f}%'
})
avg_loss = total_loss / len(val_loader)
accuracy = 100. * correct / total
# TensorBoard logging
self.writer.add_scalar('val/loss', avg_loss, epoch)
self.writer.add_scalar('val/accuracy', accuracy, epoch)
if consciousness_values:
self.writer.add_scalar('val/consciousness', sum(consciousness_values) / len(consciousness_values), epoch)
metrics = {
'loss': avg_loss,
'accuracy': accuracy,
'consciousness': sum(consciousness_values) / len(consciousness_values) if consciousness_values else None
}
return accuracy, metrics
def train(self, train_loader: DataLoader, val_loader: DataLoader):
"""Full training loop."""
print("\n" + "=" * 70)
print("Starting training with AdamW + Warm Restarts" + (" + LR Boost πŸš€" if self.config.restart_lr_mult > 1.0 else ""))
print(f"Optimizer: {self.config.optimizer_type.upper()}")
print(f"Scheduler: {self.config.scheduler_type}")
print(f"Restart period: {self.config.restart_period} epochs (T_0)")
print(f"Cycle multiplier: {self.config.restart_mult}x (T_mult)")
if self.config.restart_lr_mult > 1.0:
print(f"LR boost multiplier: {self.config.restart_lr_mult}x πŸš€")
print(f"Total restarts: {len(self.restart_epochs)}")
print("=" * 70 + "\n")
for epoch in range(self.config.num_epochs):
# Train
train_loss, train_acc = self.train_epoch(train_loader, epoch)
# Evaluate
val_acc, val_metrics = self.evaluate(val_loader, epoch)
# Update scheduler
self.scheduler.step()
# Check if this is a restart epoch or next epoch is a restart
is_restart = (epoch in self.restart_epochs)
next_is_restart = ((epoch + 1) in self.restart_epochs)
next_lr = self.scheduler.get_last_lr()[0]
# Print summary
print(f"\n{'='*70}")
print(f"Epoch [{epoch + 1}/{self.config.num_epochs}] Summary:")
print(f" Train: Loss={train_loss:.4f}, Acc={train_acc:.2f}%")
print(f" Val: Loss={val_metrics['loss']:.4f}, Acc={val_acc:.2f}%")
if val_metrics['consciousness']:
print(f" Consciousness: {val_metrics['consciousness']:.4f}")
if next_is_restart:
restart_num = self.restart_epochs.index(epoch + 1) + 1
boost_mult = self.config.restart_lr_mult ** restart_num if self.config.restart_lr_mult > 1.0 else 1.0
print(f" Next LR: {next_lr:.6f}")
print(f" ⚠️ RESTART COMING! Next epoch will jump to {next_lr * self.config.restart_lr_mult:.6f}")
if self.config.restart_lr_mult > 1.0:
print(f" πŸš€ Boosted exploration: {boost_mult:.2f}x baseline!")
print(f" (Breaking out of solidified local minima)")
elif is_restart:
restart_num = self.restart_epochs.index(epoch) + 1
boost_mult = self.config.restart_lr_mult ** restart_num if self.config.restart_lr_mult > 1.0 else 1.0
print(f" πŸ”„ WARM RESTART #{restart_num}! Current LR: {next_lr:.6f}")
if self.config.restart_lr_mult > 1.0:
print(f" πŸš€ Exploration boost: {boost_mult:.2f}x baseline")
print(f" (Wider curve for aggressive exploration)")
else:
print(f" Current LR: {next_lr:.6f}")
# Checkpoint logic
is_best = val_acc > self.best_acc
should_save_regular = ((epoch + 1) % self.config.save_interval == 0)
should_upload_regular = ((epoch + 1) % self.config.checkpoint_upload_interval == 0)
if is_best:
self.best_acc = val_acc
print(f" βœ“ New best model! Accuracy: {val_acc:.2f}%")
self.save_checkpoint(epoch, val_acc, prefix="best", upload=should_upload_regular)
if should_save_regular:
self.save_checkpoint(epoch, val_acc, prefix=f"epoch_{epoch+1}", upload=should_upload_regular)
print(f" HF Uploads: {self.upload_count}")
print(f"{'='*70}\n")
# Flush TensorBoard
if (epoch + 1) % 10 == 0:
self.writer.flush()
# Training complete
training_time = (time.time() - self.start_time) / 3600
print("\n" + "=" * 70)
print("Training Complete!")
print(f"Best Validation Accuracy: {self.best_acc:.2f}%")
print(f"Training Time: {training_time:.2f} hours")
print(f"Total Uploads: {self.upload_count}")
print(f"Warm Restarts: {len(self.restart_epochs)}")
if self.config.restart_lr_mult > 1.0:
print(f"LR Boost: {self.config.restart_lr_mult}x (helped escape local minima! πŸš€)")
print("=" * 70)
# Upload to HuggingFace
if self.hf_uploader:
print("\n[HF] Uploading final best model...")
best_model_path = self.config.checkpoint_dir / "best_model.safetensors"
best_state_path = self.config.checkpoint_dir / "best_training_state.pt"
best_metadata_path = self.config.checkpoint_dir / "best_metadata.json"
config_path = self.config.output_dir / "config.yaml"
if best_model_path.exists():
self.hf_uploader.upload_file(best_model_path, "checkpoints/best_model.safetensors")
if best_state_path.exists():
self.hf_uploader.upload_file(best_state_path, "checkpoints/best_training_state.pt")
if best_metadata_path.exists():
self.hf_uploader.upload_file(best_metadata_path, "checkpoints/best_metadata.json")
if config_path.exists():
self.hf_uploader.upload_file(config_path, "config.yaml")
print("[HF] Final upload: TensorBoard logs...")
self.hf_uploader.upload_folder_contents(self.config.tensorboard_dir, "tensorboard")
trainer_stats = {
'total_params': sum(p.numel() for p in self.model.parameters()),
'best_acc': self.best_acc,
'training_time': training_time,
'final_epoch': self.config.num_epochs,
'batch_size': self.config.batch_size,
'mixed_precision': self.use_amp
}
self.hf_uploader.create_model_card(trainer_stats)
self.writer.close()
def save_checkpoint(self, epoch: int, accuracy: float, prefix: str = "checkpoint", upload: bool = False):
"""Save checkpoint as safetensors with selective upload."""
checkpoint_dir = self.config.checkpoint_dir
checkpoint_dir.mkdir(parents=True, exist_ok=True)
# 1. Save model weights as safetensors
model_path = checkpoint_dir / f"{prefix}_model.safetensors"
save_file(self.model.state_dict(), str(model_path))
# 2. Save optimizer/scheduler state
training_state = {
'optimizer_state_dict': self.optimizer.state_dict(),
'scheduler_state_dict': self.scheduler.state_dict(),
}
if self.scaler is not None:
training_state['scaler_state_dict'] = self.scaler.state_dict()
training_state_path = checkpoint_dir / f"{prefix}_training_state.pt"
torch.save(training_state, training_state_path)
# 3. Save metadata
metadata = {
'epoch': epoch,
'accuracy': accuracy,
'best_accuracy': self.best_acc,
'global_step': self.global_step,
'timestamp': time.strftime("%Y-%m-%d %H:%M:%S"),
'optimizer': self.config.optimizer_type,
'scheduler': self.config.scheduler_type,
'learning_rate': self.scheduler.get_last_lr()[0],
'restart_lr_mult': self.config.restart_lr_mult
}
metadata_path = checkpoint_dir / f"{prefix}_metadata.json"
with open(metadata_path, 'w') as f:
json.dump(metadata, f, indent=2)
is_best = (prefix == "best")
if is_best:
print(f" πŸ’Ύ Saved best: {prefix}_model.safetensors")
else:
print(f" πŸ’Ύ Saved: {prefix}_model.safetensors", end="")
# Upload to HuggingFace
if self.hf_uploader and upload:
self.hf_uploader.upload_file(model_path, f"checkpoints/{prefix}_model.safetensors")
self.hf_uploader.upload_file(training_state_path, f"checkpoints/{prefix}_training_state.pt")
self.hf_uploader.upload_file(metadata_path, f"checkpoints/{prefix}_metadata.json")
if is_best:
config_path = self.config.output_dir / "config.yaml"
if config_path.exists():
self.hf_uploader.upload_file(config_path, "config.yaml")
self.upload_count += 1
if not is_best:
print(" β†’ Uploaded to HF")
else:
if not is_best:
print(" (local only)")
# ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
# Data Loading (with Cutout)
# ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
class Cutout:
"""Cutout data augmentation."""
def __init__(self, length: int):
self.length = length
def __call__(self, img):
h, w = img.size(1), img.size(2)
mask = torch.ones((h, w), dtype=torch.float32)
y = torch.randint(h, (1,)).item()
x = torch.randint(w, (1,)).item()
y1 = max(0, y - self.length // 2)
y2 = min(h, y + self.length // 2)
x1 = max(0, x - self.length // 2)
x2 = min(w, x + self.length // 2)
mask[y1:y2, x1:x2] = 0.
mask = mask.expand_as(img)
return img * mask
def get_data_loaders(config: CantorTrainingConfig) -> Tuple[DataLoader, DataLoader]:
"""Create data loaders."""
# Normalization
mean = (0.4914, 0.4822, 0.4465)
std = (0.2470, 0.2435, 0.2616)
# Augmentation
if config.use_augmentation:
transforms_list = []
if config.use_autoaugment:
policy = transforms.AutoAugmentPolicy.CIFAR10
transforms_list.append(transforms.AutoAugment(policy))
else:
transforms_list.extend([
transforms.RandomCrop(32, padding=4),
transforms.RandomHorizontalFlip(),
])
transforms_list.append(transforms.ToTensor())
transforms_list.append(transforms.Normalize(mean, std))
if config.use_cutout:
transforms_list.append(Cutout(config.cutout_length))
train_transform = transforms.Compose(transforms_list)
else:
train_transform = transforms.Compose([
transforms.ToTensor(),
transforms.Normalize(mean, std)
])
val_transform = transforms.Compose([
transforms.ToTensor(),
transforms.Normalize(mean, std)
])
# Dataset selection
if config.dataset == "cifar10":
train_dataset = datasets.CIFAR10(root='./data', train=True, download=True, transform=train_transform)
val_dataset = datasets.CIFAR10(root='./data', train=False, download=True, transform=val_transform)
elif config.dataset == "cifar100":
train_dataset = datasets.CIFAR100(root='./data', train=True, download=True, transform=train_transform)
val_dataset = datasets.CIFAR100(root='./data', train=False, download=True, transform=val_transform)
else:
raise ValueError(f"Unknown dataset: {config.dataset}")
train_loader = DataLoader(
train_dataset,
batch_size=config.batch_size,
shuffle=True,
num_workers=config.num_workers,
pin_memory=(config.device == "cuda")
)
val_loader = DataLoader(
val_dataset,
batch_size=config.batch_size,
shuffle=False,
num_workers=config.num_workers,
pin_memory=(config.device == "cuda")
)
return train_loader, val_loader
# ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
# Main - AdamW + CosineAnnealingWarmRestarts + LR Boost
# ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
def main():
"""Main training function with AdamW + Warm Restarts + LR Boost."""
# ═══════════════════════════════════════════════════════════════════
# Configuration - AdamW with Cosine Annealing Warm Restarts + LR BOOST
# ═══════════════════════════════════════════════════════════════════
config = CantorTrainingConfig(
# Dataset
dataset="cifar100",
# Architecture
embed_dim=512,
num_fusion_blocks=12,
num_heads=8,
fusion_mode="consciousness",
k_simplex=4,
use_beatrix=True,
fusion_window=32,
# Optimizer: AdamW
optimizer_type="adamw",
learning_rate=1e-4,
weight_decay=0.005, # Stronger regularization
adamw_betas=(0.9, 0.999),
# Scheduler: Cosine Annealing with Warm Restarts + LR BOOST
scheduler_type="cosine_restarts",
restart_period=40,
restart_mult=1.5, # Consistent cycle growth
restart_lr_mult=1.25, # πŸš€ NEW! Boost LR at restarts
min_lr=1e-7,
# Training
num_epochs=200,
batch_size=256,
grad_clip=1.0,
label_smoothing=0.15,
# Augmentation
use_augmentation=True,
use_autoaugment=True,
use_cutout=True,
cutout_length=16,
# Mixing augmentation (AlphaMix)
use_mixing=True, # Enable mixing
mixing_type="alphamix", # "alphamix" or "fractal"
mixing_alpha_range=(0.3, 0.7),
mixing_spatial_ratio=0.25,
mixing_prob=0.5, # Apply to 50% of batches
# Regularization
dropout=0.1,
drop_path_rate=0.15,
# System
device="cuda",
use_mixed_precision=False,
# HuggingFace
hf_username="AbstractPhil",
upload_to_hf=True,
checkpoint_upload_interval=25,
)
print("=" * 70)
print(f"Cantor Fusion Classifier - {config.dataset.upper()}")
print("Training Strategy: AdamW + Cosine Annealing Warm Restarts")
if config.restart_lr_mult > 1.0:
print("πŸš€ WITH LR BOOST AT RESTARTS πŸš€")
print("=" * 70)
print(f"\nConfiguration:")
print(f" Dataset: {config.dataset}")
print(f" Fusion mode: {config.fusion_mode}")
print(f" Optimizer: AdamW")
print(f" Scheduler: CosineAnnealingWarmRestarts")
print(f" Initial LR: {config.learning_rate}")
print(f" Min LR: {config.min_lr}")
print(f" Restart period (T_0): {config.restart_period} epochs")
print(f" Cycle multiplier (T_mult): {config.restart_mult}x")
if config.restart_lr_mult > 1.0:
print(f" πŸš€ Restart LR mult: {config.restart_lr_mult}x (BOOST MODE!)")
if config.use_mixing:
print(f" 🎨 Mixing: {config.mixing_type} (prob={config.mixing_prob})")
print(f" Total epochs: {config.num_epochs}")
# Calculate restart schedule
restarts = []
current = config.restart_period
period = config.restart_period
while current < config.num_epochs:
restarts.append(current)
period *= config.restart_mult
current += period
print(f"\n Restart schedule ({len(restarts)} restarts):")
for i, epoch in enumerate(restarts[:5]):
boost_mult = config.restart_lr_mult ** (i + 1) if config.restart_lr_mult > 1.0 else 1.0
lr = config.learning_rate * boost_mult
boost_str = f" ({boost_mult:.2f}x πŸš€)" if config.restart_lr_mult > 1.0 else ""
print(f" Restart #{i+1}: Epoch {epoch} β†’ LR: {lr:.2e}{boost_str}")
if len(restarts) > 5:
print(f" ... and {len(restarts) - 5} more")
print(f"\n Output: {config.output_dir}")
print(f" HuggingFace: {'Enabled' if config.upload_to_hf else 'Disabled'}")
if config.upload_to_hf:
print(f" Repo: {config.hf_username}/{config.hf_repo_name}")
print(f" Run: {config.run_name}")
if config.restart_lr_mult > 1.0:
print("\n" + "=" * 70)
print("πŸš€ LR BOOST MODE - Expected Training Behavior:")
print("=" * 70)
print(f"πŸ“‰ Cycle 1 (epochs 0-{config.restart_period}):")
print(f" LR: {config.learning_rate:.2e} β†’ {config.min_lr:.2e} (smooth drop)")
print(" Expected: Convergence to local minimum")
print("")
print(f"πŸ”„ Epoch {config.restart_period}: RESTART WITH BOOST!")
boosted_lr = config.learning_rate * config.restart_lr_mult
print(f" LR: {config.min_lr:.2e} β†’ {boosted_lr:.2e} ({config.restart_lr_mult}x BOOST!)")
print(" Expected: AGGRESSIVE exploration, escape local minimum")
print(f" Benefit: Wider curve ({(config.restart_lr_mult - 1) * 100:.0f}% more exploration)")
print("")
print(f"πŸ“‰ Cycle 2 (epochs {config.restart_period}-{int(config.restart_period * (1 + config.restart_mult))}):")
print(f" LR: {boosted_lr:.2e} β†’ {config.min_lr:.2e} (longer cycle)")
print(" Expected: Deeper convergence from better starting point")
print("")
print(f"πŸ”„ Epoch {int(config.restart_period * (1 + config.restart_mult))}: EVEN BIGGER BOOST!")
boosted_lr2 = config.learning_rate * (config.restart_lr_mult ** 2)
print(f" LR: {config.min_lr:.2e} β†’ {boosted_lr2:.2e} ({config.restart_lr_mult**2:.2f}x!)")
print(" Expected: VERY aggressive exploration")
print("")
print("🎯 Benefits:")
print(" - Escape solidified local minima with LR spikes")
print(" - Each restart explores WIDER than baseline")
print(" - Progressive boost helps late-training plateaus")
print(" - Automatic fracturing of failure modes")
print("=" * 70)
# Load data
print("\nLoading data...")
train_loader, val_loader = get_data_loaders(config)
print(f" Train: {len(train_loader.dataset)} samples")
print(f" Val: {len(val_loader.dataset)} samples")
# Train
trainer = Trainer(config)
trainer.train(train_loader, val_loader)
print("\n" + "=" * 70)
print("🎯 Training complete!")
if config.restart_lr_mult > 1.0:
print(" Check TensorBoard to see the BOOSTED warm restart cycles!")
else:
print(" Check TensorBoard to see the warm restart cycles!")
print(f" tensorboard --logdir {config.tensorboard_dir}")
print("")
print(" Look for:")
print(" - Smooth LR drops during each cycle")
if config.restart_lr_mult > 1.0:
print(" - πŸš€ BOOSTED LR jumps at restart epochs")
print(" - Wider exploration curves after restarts")
else:
print(" - Sharp LR jumps at restart epochs")
print(" - Accuracy improvements across cycles")
print("=" * 70)
if __name__ == "__main__":
main()