# train_cantor_fusion_hf.py - PRODUCTION WITH ADAMW + WARM RESTARTS + LR BOOST """ Cantor Fusion Classifier with AdamW + Cosine Warm Restarts + LR Boost ---------------------------------------------------------------------- Features: - AdamW optimizer (best for ViTs) - CosineAnnealingWarmRestarts with configurable LR boost at restarts - restart_lr_mult: Multiply LR at restart points for aggressive exploration - HuggingFace Hub uploads (ONE shared repo, organized by run) - TensorBoard logging (loss, accuracy, fusion metrics, LR tracking) - Easy CIFAR-10/100 switching - Automatic checkpoint management - SafeTensors format (ClamAV safe) New Feature: restart_lr_mult When restart_lr_mult > 1.0, learning rate at restart is BOOSTED: - Normal: 3e-4 → 1e-7 → restart at 3e-4 - Boosted (1.5x): 3e-4 → 1e-7 → restart at 4.5e-4 → 1e-7 - Creates wider exploration curves to escape solidified local minima Author: AbstractPhil License: MIT """ import torch import torch.nn as nn import torch.nn.functional as F from torch.utils.data import DataLoader from torch.utils.tensorboard import SummaryWriter from torchvision import datasets, transforms from torch.cuda.amp import autocast, GradScaler from safetensors.torch import save_file, load_file import math import os import json from typing import Optional, Dict, List, Tuple, Union from dataclasses import dataclass, asdict import time from pathlib import Path from tqdm import tqdm # HuggingFace from huggingface_hub import HfApi, create_repo, upload_folder, upload_file import yaml # Import from your repo from geovocab2.train.model.layers.attention.cantor_multiheaded_fusion import ( CantorMultiheadFusion, CantorFusionConfig ) from geovocab2.shapes.factory.cantor_route_factory import ( CantorRouteFactory, RouteMode, SimplexConfig ) # ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ # Mixing Augmentations (AlphaMix / Fractal AlphaMix) # ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ def alphamix_data(x, y, alpha_range=(0.3, 0.7), spatial_ratio=0.25): """ Standard AlphaMix: Single spatially localized transparent overlay. Args: x: Input images [B, C, H, W] y: Labels [B] alpha_range: Range for transparency sampling spatial_ratio: Ratio of image area to overlay Returns: composited_x: Mixed images y_a: Original labels y_b: Mixed labels alpha: Effective mixing coefficient """ batch_size = x.size(0) index = torch.randperm(batch_size, device=x.device) y_a, y_b = y, y[index] # Sample alpha from Beta distribution alpha_min, alpha_max = alpha_range beta_sample = torch.distributions.Beta(2.0, 2.0).sample().item() alpha = alpha_min + (alpha_max - alpha_min) * beta_sample # Compute overlay region _, _, H, W = x.shape overlay_ratio = torch.sqrt(torch.tensor(spatial_ratio)).item() overlay_h = int(H * overlay_ratio) overlay_w = int(W * overlay_ratio) top = torch.randint(0, H - overlay_h + 1, (1,), device=x.device).item() left = torch.randint(0, W - overlay_w + 1, (1,), device=x.device).item() # Blend composited_x = x.clone() overlay_region = alpha * x[:, :, top:top+overlay_h, left:left+overlay_w] background_region = (1 - alpha) * x[index, :, top:top+overlay_h, left:left+overlay_w] composited_x[:, :, top:top+overlay_h, left:left+overlay_w] = overlay_region + background_region return composited_x, y_a, y_b, alpha def alphamix_fractal( x: torch.Tensor, y: torch.Tensor, alpha_range=(0.3, 0.7), steps_range=(1, 3), triad_scales=(1/3, 1/9, 1/27), beta_shape=(2.0, 2.0), seed: Optional[int] = None, ): """ Fractal AlphaMix: Triadic multi-patch overlays aligned to Cantor geometry. Pure torch, GPU-compatible. Args: x: Input images [B, C, H, W] y: Labels [B] alpha_range: Range for transparency sampling steps_range: Range for number of patches to apply triad_scales: Triadic scales (1/3, 1/9, 1/27 for Cantor-like) beta_shape: Beta distribution parameters for sampling seed: Optional random seed Returns: x_mix: Mixed images y_a: Original labels y_b: Mixed labels alpha_eff: Effective area-weighted mixing coefficient """ if seed is not None: torch.manual_seed(seed) B, C, H, W = x.shape device = x.device # Permutation for mixing idx = torch.randperm(B, device=device) y_a, y_b = y, y[idx] x_mix = x.clone() total_area = H * W # Beta distribution for transparency sampling k1, k2 = beta_shape beta_dist = torch.distributions.Beta(k1, k2) alpha_min, alpha_max = alpha_range # Storage for effective alpha calculation alpha_elems = [] area_weights = [] # Sample number of patches (same for all images in batch) steps = torch.randint(steps_range[0], steps_range[1] + 1, (1,), device=device).item() for _ in range(steps): # Choose triadic scale scale_idx = torch.randint(0, len(triad_scales), (1,), device=device).item() scale = triad_scales[scale_idx] # Compute patch dimensions (triadic area) patch_area = max(1, int(total_area * scale)) side = int(torch.sqrt(torch.tensor(patch_area, dtype=torch.float32)).item()) h = max(1, min(H, side)) w = max(1, min(W, side)) # Random position top = torch.randint(0, H - h + 1, (1,), device=device).item() left = torch.randint(0, W - w + 1, (1,), device=device).item() # Sample transparency from Beta distribution alpha_raw = beta_dist.sample().item() alpha = alpha_min + (alpha_max - alpha_min) * alpha_raw # Track for effective alpha alpha_elems.append(alpha) area_weights.append(h * w) # Blend patches fg = alpha * x[:, :, top:top + h, left:left + w] bg = (1 - alpha) * x[idx, :, top:top + h, left:left + w] x_mix[:, :, top:top + h, left:left + w] = fg + bg # Compute area-weighted effective alpha alpha_t = torch.tensor(alpha_elems, dtype=torch.float32, device=device) area_t = torch.tensor(area_weights, dtype=torch.float32, device=device) alpha_eff = (alpha_t * area_t).sum() / (area_t.sum() + 1e-12) alpha_eff = alpha_eff.item() return x_mix, y_a, y_b, alpha_eff # ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ # Custom Scheduler with LR Boost at Restarts # ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ class CosineAnnealingWarmRestartsWithBoost(torch.optim.lr_scheduler._LRScheduler): """ Cosine Annealing with Warm Restarts and optional LR boost at restart points. At each restart, the max LR is multiplied by `restart_lr_mult`, creating wider exploration curves that can help escape solidified local minima. Args: optimizer: Wrapped optimizer T_0: Number of iterations for the first restart T_mult: Factor to increase T_i after each restart (default: 1) eta_min: Minimum learning rate (default: 0) restart_lr_mult: Multiply max LR by this at each restart (default: 1.0) Values > 1.0 create boosted exploration cycles last_epoch: The index of last epoch (default: -1) Example: >>> scheduler = CosineAnnealingWarmRestartsWithBoost( ... optimizer, T_0=50, T_mult=2, restart_lr_mult=1.5 ... ) # Cycle 1: 3e-4 → 1e-7 (50 epochs) # Restart: LR jumps to 4.5e-4 (1.5x boost) # Cycle 2: 4.5e-4 → 1e-7 (100 epochs) # Restart: LR jumps to 6.75e-4 (1.5x boost again) # Cycle 3: 6.75e-4 → 1e-7 (200 epochs) """ def __init__( self, optimizer: torch.optim.Optimizer, T_0: int, T_mult: float = 1, eta_min: float = 0, restart_lr_mult: float = 1.0, last_epoch: int = -1 ): if T_0 <= 0 or not isinstance(T_0, int): raise ValueError(f"Expected positive integer T_0, but got {T_0}") if T_mult < 1: raise ValueError(f"Expected T_mult >= 1, but got {T_mult}") if restart_lr_mult <= 0: raise ValueError(f"Expected positive restart_lr_mult, but got {restart_lr_mult}") self.T_0 = T_0 self.T_i = T_0 self.T_mult = T_mult self.eta_min = eta_min self.restart_lr_mult = restart_lr_mult self.T_cur = last_epoch # Track boosted base LRs and restart count self.current_base_lrs = None self.restart_count = 0 super().__init__(optimizer, last_epoch) def get_lr(self): if self.T_cur == -1: # First step - return base LRs return self.base_lrs # Use boosted base LRs if we've had restarts if self.current_base_lrs is None: base_lrs_to_use = self.base_lrs else: base_lrs_to_use = self.current_base_lrs # Cosine annealing from current base LR to eta_min return [ self.eta_min + (base_lr - self.eta_min) * (1 + math.cos(math.pi * self.T_cur / self.T_i)) / 2 for base_lr in base_lrs_to_use ] def step(self, epoch=None): if epoch is None and self.last_epoch < 0: epoch = 0 if epoch is None: epoch = self.last_epoch + 1 self.T_cur = self.T_cur + 1 # Check if we hit a restart point if self.T_cur >= self.T_i: # APPLY BOOST HERE before reset self.restart_count += 1 if self.current_base_lrs is None: self.current_base_lrs = list(self.base_lrs) # Boost the base LRs self.current_base_lrs = [ base_lr * self.restart_lr_mult for base_lr in self.current_base_lrs ] # Now reset cycle self.T_cur = self.T_cur - self.T_i self.T_i = int(self.T_i * self.T_mult) else: if epoch < 0: raise ValueError(f"Expected non-negative epoch, but got {epoch}") if epoch >= self.T_0: if self.T_mult == 1: self.T_cur = epoch % self.T_0 # Count how many restarts have occurred self.restart_count = epoch // self.T_0 else: n = int(math.log((epoch / self.T_0 * (self.T_mult - 1) + 1), self.T_mult)) self.restart_count = n self.T_cur = epoch - self.T_0 * (self.T_mult ** n - 1) / (self.T_mult - 1) self.T_i = self.T_0 * self.T_mult ** n # Apply cumulative boost if self.current_base_lrs is None: self.current_base_lrs = [ base_lr * (self.restart_lr_mult ** self.restart_count) for base_lr in self.base_lrs ] else: self.T_i = self.T_0 self.T_cur = epoch self.last_epoch = math.floor(epoch) for param_group, lr in zip(self.optimizer.param_groups, self.get_lr()): param_group['lr'] = lr self._last_lr = [group['lr'] for group in self.optimizer.param_groups] # ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ # Configuration # ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ @dataclass class CantorTrainingConfig: """Complete configuration for Cantor fusion training with AdamW + Warm Restarts.""" # Dataset dataset: str = "cifar10" # "cifar10" or "cifar100" num_classes: int = 10 # Architecture image_size: int = 32 patch_size: int = 4 embed_dim: int = 384 num_fusion_blocks: int = 6 num_heads: int = 8 fusion_window: int = 32 fusion_mode: str = "weighted" # "weighted" or "consciousness" k_simplex: int = 4 use_beatrix: bool = False beatrix_tau: float = 0.25 # Optimization precompute_geometric: bool = True use_torch_compile: bool = True use_mixed_precision: bool = False # Regularization dropout: float = 0.1 drop_path_rate: float = 0.1 label_smoothing: float = 0.1 # Training - Optimizer (AdamW) optimizer_type: str = "adamw" # "sgd" or "adamw" batch_size: int = 128 num_epochs: int = 300 learning_rate: float = 3e-4 # AdamW default weight_decay: float = 0.05 grad_clip: float = 1.0 # SGD-specific (if needed) sgd_momentum: float = 0.9 sgd_nesterov: bool = True # AdamW-specific adamw_betas: Tuple[float, float] = (0.9, 0.999) adamw_eps: float = 1e-8 # Learning rate schedule - WARM RESTARTS WITH BOOST scheduler_type: str = "cosine_restarts" # "multistep", "cosine", "cosine_restarts" # CosineAnnealingWarmRestarts parameters restart_period: int = 50 # T_0: epochs until first restart restart_mult: float = 2.0 # T_mult: multiply period after each restart (can be float like 1.5) restart_lr_mult: float = 1.0 # NEW: LR multiplier at restarts (>1.0 for boosted exploration) min_lr: float = 1e-7 # eta_min: minimum learning rate # MultiStepLR (for SGD fallback) lr_milestones: List[int] = None lr_gamma: float = 0.2 # Cosine annealing (regular, no restarts) warmup_epochs: int = 0 # Data augmentation use_augmentation: bool = True use_autoaugment: bool = True use_cutout: bool = False cutout_length: int = 16 # Mixing augmentation (AlphaMix / Fractal AlphaMix) use_mixing: bool = False mixing_type: str = "alphamix" # "alphamix" or "fractal" mixing_alpha_range: Tuple[float, float] = (0.3, 0.7) mixing_spatial_ratio: float = 0.25 # For standard alphamix mixing_prob: float = 1.0 # Probability of applying mixing # Fractal AlphaMix specific fractal_steps_range: Tuple[int, int] = (1, 3) fractal_triad_scales: Tuple[float, ...] = (1/3, 1/9, 1/27) # System device: str = "cuda" if torch.cuda.is_available() else "cpu" num_workers: int = 8 seed: int = 42 # Paths weights_dir: str = "weights" model_name: str = "vit-beans-v3" run_name: Optional[str] = None # Auto-generated if None # HuggingFace - ONE SHARED REPO hf_username: str = "AbstractPhil" hf_repo_name: Optional[str] = None upload_to_hf: bool = True hf_token: Optional[str] = None # Logging log_interval: int = 50 save_interval: int = 10 checkpoint_upload_interval: int = 20 def __post_init__(self): # Auto-set num_classes based on dataset if self.dataset == "cifar10": self.num_classes = 10 elif self.dataset == "cifar100": self.num_classes = 100 else: raise ValueError(f"Unknown dataset: {self.dataset}") # Set default milestones if None (for multistep fallback) if self.lr_milestones is None: if self.num_epochs >= 200: self.lr_milestones = [60, 120, 160] elif self.num_epochs >= 100: self.lr_milestones = [30, 60, 80] else: self.lr_milestones = [ int(self.num_epochs * 0.5), int(self.num_epochs * 0.75) ] # Auto-generate run name if self.run_name is None: timestamp = time.strftime("%Y%m%d_%H%M%S") opt_name = self.optimizer_type.upper() sched_name = "WarmRestart" if self.scheduler_type == "cosine_restarts" else self.scheduler_type boost_str = f"_boost{self.restart_lr_mult}x" if self.restart_lr_mult > 1.0 else "" self.run_name = f"{self.dataset}_{self.fusion_mode}_{opt_name}_{sched_name}{boost_str}_{timestamp}" # ONE SHARED REPO for all runs if self.hf_repo_name is None: self.hf_repo_name = self.model_name # Set HF token from environment if not provided if self.hf_token is None: self.hf_token = os.environ.get("HF_TOKEN") # Calculate derived values assert self.image_size % self.patch_size == 0 self.num_patches = (self.image_size // self.patch_size) ** 2 self.patch_dim = self.patch_size * self.patch_size * 3 # Create paths self.output_dir = Path(self.weights_dir) / self.model_name / self.run_name self.checkpoint_dir = self.output_dir / "checkpoints" self.tensorboard_dir = self.output_dir / "tensorboard" # Create directories self.output_dir.mkdir(parents=True, exist_ok=True) self.checkpoint_dir.mkdir(parents=True, exist_ok=True) self.tensorboard_dir.mkdir(parents=True, exist_ok=True) def save(self, path: Union[str, Path]): """Save config to YAML file.""" path = Path(path) config_dict = asdict(self) # Convert tuples to lists for YAML if 'adamw_betas' in config_dict: config_dict['adamw_betas'] = list(config_dict['adamw_betas']) with open(path, 'w') as f: yaml.dump(config_dict, f, default_flow_style=False) @classmethod def load(cls, path: Union[str, Path]): """Load config from YAML file.""" path = Path(path) with open(path, 'r') as f: config_dict = yaml.safe_load(f) # Convert lists back to tuples if 'adamw_betas' in config_dict: config_dict['adamw_betas'] = tuple(config_dict['adamw_betas']) return cls(**config_dict) # ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ # Model Components (unchanged from previous version) # ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ class PatchEmbedding(nn.Module): """Patch embedding layer.""" def __init__(self, config: CantorTrainingConfig): super().__init__() self.config = config self.proj = nn.Conv2d(3, config.embed_dim, kernel_size=config.patch_size, stride=config.patch_size) self.pos_embed = nn.Parameter(torch.randn(1, config.num_patches, config.embed_dim) * 0.02) def forward(self, x: torch.Tensor) -> torch.Tensor: x = self.proj(x) x = x.flatten(2).transpose(1, 2) x = x + self.pos_embed return x class DropPath(nn.Module): """Stochastic depth.""" def __init__(self, drop_prob: float = 0.0): super().__init__() self.drop_prob = drop_prob def forward(self, x): if self.drop_prob == 0. or not self.training: return x keep_prob = 1 - self.drop_prob shape = (x.shape[0],) + (1,) * (x.ndim - 1) random_tensor = keep_prob + torch.rand(shape, dtype=x.dtype, device=x.device) random_tensor.floor_() return x.div(keep_prob) * random_tensor class CantorFusionBlock(nn.Module): """Cantor fusion block.""" def __init__(self, config: CantorTrainingConfig, drop_path: float = 0.0): super().__init__() self.norm1 = nn.LayerNorm(config.embed_dim) fusion_config = CantorFusionConfig( dim=config.embed_dim, num_heads=config.num_heads, fusion_window=config.fusion_window, fusion_mode=config.fusion_mode, k_simplex=config.k_simplex, use_beatrix_routing=config.use_beatrix, use_consciousness_weighting=(config.fusion_mode == "consciousness"), beatrix_tau=config.beatrix_tau, use_gating=True, dropout=config.dropout, residual=False, precompute_staircase=config.precompute_geometric, precompute_routes=config.precompute_geometric, precompute_distances=config.precompute_geometric, use_optimized_gather=True, staircase_cache_sizes=[config.num_patches], use_torch_compile=config.use_torch_compile ) self.fusion = CantorMultiheadFusion(fusion_config) self.norm2 = nn.LayerNorm(config.embed_dim) mlp_hidden = config.embed_dim * 4 self.mlp = nn.Sequential( nn.Linear(config.embed_dim, mlp_hidden), nn.GELU(), nn.Dropout(config.dropout), nn.Linear(mlp_hidden, config.embed_dim), nn.Dropout(config.dropout) ) self.drop_path = DropPath(drop_path) if drop_path > 0 else nn.Identity() def forward(self, x: torch.Tensor, return_fusion_info: bool = False) -> Union[torch.Tensor, Tuple[torch.Tensor, Dict]]: fusion_result = self.fusion(self.norm1(x)) x = x + self.drop_path(fusion_result['output']) x = x + self.drop_path(self.mlp(self.norm2(x))) if return_fusion_info: fusion_info = { 'consciousness': fusion_result.get('consciousness'), 'cantor_measure': fusion_result.get('cantor_measure') } return x, fusion_info return x class CantorClassifier(nn.Module): """Cantor fusion classifier.""" def __init__(self, config: CantorTrainingConfig): super().__init__() self.config = config self.patch_embed = PatchEmbedding(config) dpr = [x.item() for x in torch.linspace(0, config.drop_path_rate, config.num_fusion_blocks)] self.blocks = nn.ModuleList([ CantorFusionBlock(config, drop_path=dpr[i]) for i in range(config.num_fusion_blocks) ]) self.norm = nn.LayerNorm(config.embed_dim) self.head = nn.Linear(config.embed_dim, config.num_classes) self.apply(self._init_weights) def _init_weights(self, m): if isinstance(m, nn.Linear): nn.init.trunc_normal_(m.weight, std=0.02) if m.bias is not None: nn.init.constant_(m.bias, 0) elif isinstance(m, nn.LayerNorm): nn.init.constant_(m.bias, 0) nn.init.constant_(m.weight, 1.0) elif isinstance(m, nn.Conv2d): nn.init.kaiming_normal_(m.weight, mode='fan_out', nonlinearity='relu') def forward(self, x: torch.Tensor, return_fusion_info: bool = False) -> Union[torch.Tensor, Tuple[torch.Tensor, List[Dict]]]: x = self.patch_embed(x) fusion_infos = [] for i, block in enumerate(self.blocks): if return_fusion_info and i == len(self.blocks) - 1: x, fusion_info = block(x, return_fusion_info=True) fusion_infos.append(fusion_info) else: x = block(x) x = self.norm(x) x = x.mean(dim=1) logits = self.head(x) if return_fusion_info: return logits, fusion_infos return logits # ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ # HuggingFace Integration # ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ class HuggingFaceUploader: """Manages HuggingFace Hub uploads to ONE shared repo.""" def __init__(self, config: CantorTrainingConfig): self.config = config self.api = HfApi(token=config.hf_token) if config.upload_to_hf else None self.repo_id = f"{config.hf_username}/{config.hf_repo_name}" self.run_prefix = f"runs/{config.run_name}" if config.upload_to_hf: self._create_repo() self._update_main_readme() def _create_repo(self): """Create HuggingFace repo if it doesn't exist.""" try: create_repo( repo_id=self.repo_id, token=self.config.hf_token, exist_ok=True, private=False ) print(f"[HF] Repository: https://huggingface.co/{self.repo_id}") print(f"[HF] Run folder: {self.run_prefix}") except Exception as e: print(f"[HF] Warning: Could not create repo: {e}") def _update_main_readme(self): """Create or update the main shared README at repo root.""" if not self.config.upload_to_hf or self.api is None: return boost_info = "" if self.config.restart_lr_mult > 1.0: boost_info = f""" ### 🚀 LR Boost at Restarts (NEW!) This run uses **restart_lr_mult = {self.config.restart_lr_mult}x**: - Normal restart: 3e-4 → 1e-7 → restart at 3e-4 - **Boosted restart**: 3e-4 → 1e-7 → restart at {self.config.learning_rate * self.config.restart_lr_mult:.2e} ({self.config.restart_lr_mult}x!) - Creates **wider exploration curves** to escape solidified local minima - Each restart provides progressively stronger exploration boost """ main_readme = f"""--- tags: - image-classification - cantor-fusion - geometric-deep-learning - safetensors - vision-transformer - warm-restarts library_name: pytorch datasets: - cifar10 - cifar100 metrics: - accuracy --- # {self.config.hf_repo_name} **Geometric Deep Learning with Cantor Multihead Fusion + AdamW Warm Restarts** This repository contains multiple training runs using Cantor fusion architecture with pentachoron structures, geometric routing, and **CosineAnnealingWarmRestarts** for automatic exploration cycles. ## Training Strategy: AdamW + Warm Restarts This model uses **AdamW with Cosine Annealing Warm Restarts** (SGDR): - **Drop phase**: LR decays from {self.config.learning_rate} → {self.config.min_lr} over {self.config.restart_period} epochs - **Restart phase**: LR jumps back to {self.config.learning_rate} to explore new regions - **Cycle multiplier**: Each cycle is {self.config.restart_mult}x longer than previous - **Benefits**: Automatic exploration + exploitation, finds better minima, robust training {boost_info} ### Restart Schedule ``` Epochs 0-{self.config.restart_period}: LR: {self.config.learning_rate} → {self.config.min_lr} (first cycle) Epoch {self.config.restart_period}: LR: RESTART to {self.config.learning_rate * self.config.restart_lr_mult if self.config.restart_lr_mult > 1.0 else self.config.learning_rate} 🔄 Epochs {self.config.restart_period}-{self.config.restart_period * (1 + self.config.restart_mult)}: LR: {self.config.learning_rate * self.config.restart_lr_mult if self.config.restart_lr_mult > 1.0 else self.config.learning_rate} → {self.config.min_lr} (longer cycle) ... ``` ## Current Run **Latest**: `{self.config.run_name}` - **Dataset**: {self.config.dataset.upper()} - **Fusion Mode**: {self.config.fusion_mode} - **Optimizer**: AdamW (adaptive moments) - **Scheduler**: CosineAnnealingWarmRestarts - **Restart LR Mult**: {self.config.restart_lr_mult}x - **Architecture**: {self.config.num_fusion_blocks} blocks, {self.config.num_heads} heads - **Simplex**: {self.config.k_simplex}-simplex ({self.config.k_simplex + 1} vertices) ## Architecture The Cantor Fusion architecture uses: - **Geometric Routing**: Pentachoron (5-simplex) structures for token routing - **Cantor Multihead Fusion**: Multiple fusion heads with geometric attention - **Beatrix Consciousness Routing**: Optional consciousness-aware token fusion - **SafeTensors Format**: All model weights use SafeTensors (not pickle) ## Usage ```python from huggingface_hub import hf_hub_download from safetensors.torch import load_file model_path = hf_hub_download( repo_id="{self.repo_id}", filename="runs/YOUR_RUN_NAME/checkpoints/best_model.safetensors" ) state_dict = load_file(model_path) model.load_state_dict(state_dict) ``` ## Citation ```bibtex @misc{{{self.config.hf_repo_name.replace('-', '_')}, author = {{AbstractPhil}}, title = {{{self.config.hf_repo_name}: Geometric Deep Learning with Warm Restarts}}, year = {{2025}}, publisher = {{HuggingFace}}, url = {{https://huggingface.co/{self.repo_id}}} }} ``` --- **Repository maintained by**: [@{self.config.hf_username}](https://huggingface.co/{self.config.hf_username}) **Latest update**: {time.strftime("%Y-%m-%d %H:%M:%S")} """ main_readme_path = Path(self.config.weights_dir) / self.config.model_name / "MAIN_README.md" main_readme_path.parent.mkdir(parents=True, exist_ok=True) with open(main_readme_path, 'w') as f: f.write(main_readme) try: upload_file( path_or_fileobj=str(main_readme_path), path_in_repo="README.md", repo_id=self.repo_id, token=self.config.hf_token ) print(f"[HF] Updated main README") except Exception as e: print(f"[HF] Main README upload failed: {e}") def upload_file(self, file_path: Path, repo_path: str): """Upload single file to HuggingFace.""" if not self.config.upload_to_hf or self.api is None: return try: if not repo_path.startswith(self.run_prefix) and not repo_path.startswith("runs/"): full_path = f"{self.run_prefix}/{repo_path}" else: full_path = repo_path upload_file( path_or_fileobj=str(file_path), path_in_repo=full_path, repo_id=self.repo_id, token=self.config.hf_token ) print(f"[HF] ✓ Uploaded: {full_path}") except Exception as e: print(f"[HF] ✗ Upload failed ({full_path}): {e}") def upload_folder_contents(self, folder_path: Path, repo_folder: str): """Upload entire folder to HuggingFace.""" if not self.config.upload_to_hf or self.api is None: return try: full_path = f"{self.run_prefix}/{repo_folder}" upload_folder( folder_path=str(folder_path), repo_id=self.repo_id, path_in_repo=full_path, token=self.config.hf_token, ignore_patterns=["*.pyc", "__pycache__"] ) print(f"[HF] Uploaded folder: {full_path}") except Exception as e: print(f"[HF] Folder upload failed: {e}") def create_model_card(self, trainer_stats: Dict): """Create and upload run-specific model card.""" if not self.config.upload_to_hf: return boost_section = "" if self.config.restart_lr_mult > 1.0: boost_section = f""" ### 🚀 LR Boost Feature This run uses **restart_lr_mult = {self.config.restart_lr_mult}x** for aggressive exploration: **How it works:** ``` Cycle 1: {self.config.learning_rate:.2e} → {self.config.min_lr:.2e} (standard convergence) Restart: → {self.config.learning_rate * self.config.restart_lr_mult:.2e} (BOOSTED!) Cycle 2: {self.config.learning_rate * self.config.restart_lr_mult:.2e} → {self.config.min_lr:.2e} (wider exploration) Restart: → {self.config.learning_rate * (self.config.restart_lr_mult ** 2):.2e} (EVEN MORE BOOSTED!) Cycle 3: {self.config.learning_rate * (self.config.restart_lr_mult ** 2):.2e} → {self.config.min_lr:.2e} ... ``` **Benefits:** - 🔓 **Escape solidified local minima** with aggressive LR spikes - 🌊 **Wider exploration curves** after each restart - 💪 **Progressively stronger exploration** as training proceeds - 🎯 **Combat training plateaus** that plague long runs """ run_card = f"""# Run: {self.config.run_name} ## Configuration - **Dataset**: {self.config.dataset.upper()} - **Fusion Mode**: {self.config.fusion_mode} - **Parameters**: {trainer_stats['total_params']:,} - **Simplex**: {self.config.k_simplex}-simplex ({self.config.k_simplex + 1} vertices) ## Performance - **Best Validation Accuracy**: {trainer_stats['best_acc']:.2f}% - **Training Time**: {trainer_stats['training_time']:.1f} hours - **Final Epoch**: {trainer_stats['final_epoch']} ## Training Setup: AdamW + Warm Restarts - **Optimizer**: AdamW (lr={self.config.learning_rate}, wd={self.config.weight_decay}) - **Scheduler**: CosineAnnealingWarmRestarts - **Restart Period (T_0)**: {self.config.restart_period} epochs - **Cycle Multiplier (T_mult)**: {self.config.restart_mult}x - **Restart LR Mult**: {self.config.restart_lr_mult}x {'🚀' if self.config.restart_lr_mult > 1.0 else ''} - **Min LR**: {self.config.min_lr} - **Batch Size**: {self.config.batch_size} - **Mixed Precision**: {trainer_stats.get('mixed_precision', False)} {boost_section} ### Learning Rate Schedule ``` Cycle 1: Epochs 0-{self.config.restart_period} LR: {self.config.learning_rate} → {self.config.min_lr} (drop) Expected: Convergence to local minimum Epoch {self.config.restart_period}: RESTART 🔄 LR: {self.config.min_lr} → {self.config.learning_rate * self.config.restart_lr_mult if self.config.restart_lr_mult > 1.0 else self.config.learning_rate} (jump{"!" if self.config.restart_lr_mult > 1.0 else ""}) Expected: Escape local minimum, explore new regions Cycle 2: Epochs {self.config.restart_period}-{self.config.restart_period * (1 + self.config.restart_mult)} LR: {self.config.learning_rate * self.config.restart_lr_mult if self.config.restart_lr_mult > 1.0 else self.config.learning_rate} → {self.config.min_lr} (longer cycle) Expected: Deeper convergence ... and so on ``` ## Files - `{self.run_prefix}/checkpoints/best_model.safetensors` - Model weights - `{self.run_prefix}/checkpoints/best_training_state.pt` - Optimizer state - `{self.run_prefix}/config.yaml` - Full configuration - `{self.run_prefix}/tensorboard/` - TensorBoard logs (LR tracking!) ## Usage ```python from safetensors.torch import load_file from huggingface_hub import hf_hub_download model_path = hf_hub_download( repo_id="{self.repo_id}", filename="{self.run_prefix}/checkpoints/best_model.safetensors" ) state_dict = load_file(model_path) model.load_state_dict(state_dict) ``` ## Training Notes **Warm Restarts Benefits:** - 🔄 **Exploration**: Periodic LR jumps escape local minima - 📉 **Exploitation**: Long drop phases converge deeply - 🎯 **Robustness**: Multiple restarts find better solutions - 📊 **Monitoring**: Watch TensorBoard for restart effects! **Expected Behavior:** - Accuracy improves during each drop phase - Brief accuracy dips after restarts (exploration) - Overall upward trend across cycles - Best models often found late in long cycles --- Built with geometric consciousness-aware routing using the Devil's Staircase (Beatrix) and pentachoron parameterization. **Training completed**: {time.strftime("%Y-%m-%d %H:%M:%S")} [← Back to main repository](https://huggingface.co/{self.repo_id}) """ readme_path = self.config.output_dir / "RUN_README.md" with open(readme_path, 'w') as f: f.write(run_card) try: upload_file( path_or_fileobj=str(readme_path), path_in_repo=f"{self.run_prefix}/README.md", repo_id=self.repo_id, token=self.config.hf_token ) print(f"[HF] Uploaded run README") except Exception as e: print(f"[HF] Run README upload failed: {e}") # ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ # Trainer with AdamW + CosineAnnealingWarmRestarts + LR Boost # ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ class Trainer: """Training manager with AdamW + Warm Restarts + LR Boost.""" def __init__(self, config: CantorTrainingConfig): self.config = config self.device = torch.device(config.device) # Set seed torch.manual_seed(config.seed) if torch.cuda.is_available(): torch.cuda.manual_seed(config.seed) # Model print("\n" + "=" * 70) print(f"Initializing Cantor Classifier - {config.dataset.upper()}") print("=" * 70) init_start = time.time() self.model = CantorClassifier(config).to(self.device) init_time = time.time() - init_start print(f"\n[Model] Initialization time: {init_time:.2f}s") self.print_model_info() # Track restart epochs for logging self.restart_epochs = self._calculate_restart_epochs() # Optimizer self.optimizer = self.create_optimizer() # Scheduler self.scheduler = self.create_scheduler() # Loss self.criterion = nn.CrossEntropyLoss(label_smoothing=config.label_smoothing) # Mixing info self.use_mixing = config.use_mixing self.mixing_type = config.mixing_type self.mixing_prob = config.mixing_prob # Mixed precision self.use_amp = config.use_mixed_precision and config.device == "cuda" self.scaler = GradScaler() if self.use_amp else None if self.use_amp: print(f"[Training] Mixed precision enabled") # TensorBoard self.writer = SummaryWriter(log_dir=str(config.tensorboard_dir)) print(f"[TensorBoard] Logging to: {config.tensorboard_dir}") print(f"[Checkpoints] Format: SafeTensors (ClamAV safe)") # HuggingFace self.hf_uploader = HuggingFaceUploader(config) if config.upload_to_hf else None # Save config config.save(config.output_dir / "config.yaml") # Metrics self.best_acc = 0.0 self.global_step = 0 self.start_time = time.time() self.upload_count = 0 def apply_mixing(self, images: torch.Tensor, labels: torch.Tensor): """Apply mixing augmentation if enabled.""" if not self.use_mixing or torch.rand(1).item() > self.mixing_prob: return images, labels, None if self.mixing_type == "alphamix": mixed_images, y_a, y_b, alpha = alphamix_data( images, labels, alpha_range=self.config.mixing_alpha_range, spatial_ratio=self.config.mixing_spatial_ratio ) elif self.mixing_type == "fractal": mixed_images, y_a, y_b, alpha = alphamix_fractal( images, labels, alpha_range=self.config.mixing_alpha_range, steps_range=self.config.fractal_steps_range, triad_scales=self.config.fractal_triad_scales ) else: raise ValueError(f"Unknown mixing type: {self.mixing_type}") return mixed_images, (y_a, y_b, alpha), alpha def compute_mixed_loss(self, logits: torch.Tensor, mixed_labels): """Compute loss for mixed labels.""" if mixed_labels is None: # No mixing applied return None y_a, y_b, alpha = mixed_labels loss_a = self.criterion(logits, y_a) loss_b = self.criterion(logits, y_b) # Weighted combination based on mixing ratio # Use spatial_ratio for weighting (alpha represents transparency) loss = alpha * loss_a + (1 - alpha) * loss_b return loss def _calculate_restart_epochs(self) -> List[int]: """Calculate when restarts will occur.""" if self.config.scheduler_type != "cosine_restarts": return [] restarts = [] current = self.config.restart_period period = self.config.restart_period while current < self.config.num_epochs: restarts.append(current) period *= self.config.restart_mult current += period return restarts def create_optimizer(self): """Create optimizer based on config.""" if self.config.optimizer_type == "sgd": print(f"\n[Optimizer] SGD") print(f" LR: {self.config.learning_rate}") print(f" Momentum: {self.config.sgd_momentum}") print(f" Nesterov: {self.config.sgd_nesterov}") print(f" Weight decay: {self.config.weight_decay}") return torch.optim.SGD( self.model.parameters(), lr=self.config.learning_rate, momentum=self.config.sgd_momentum, weight_decay=self.config.weight_decay, nesterov=self.config.sgd_nesterov ) elif self.config.optimizer_type == "adamw": print(f"\n[Optimizer] AdamW") print(f" LR: {self.config.learning_rate}") print(f" Betas: {self.config.adamw_betas}") print(f" Weight decay: {self.config.weight_decay}") return torch.optim.AdamW( self.model.parameters(), lr=self.config.learning_rate, betas=self.config.adamw_betas, eps=self.config.adamw_eps, weight_decay=self.config.weight_decay ) else: raise ValueError(f"Unknown optimizer: {self.config.optimizer_type}") def create_scheduler(self): """Create LR scheduler based on config.""" if self.config.scheduler_type == "cosine_restarts": print(f"\n[Scheduler] CosineAnnealingWarmRestarts with LR Boost") print(f" T_0 (restart period): {self.config.restart_period} epochs") print(f" T_mult (cycle multiplier): {self.config.restart_mult}x") print(f" Restart LR mult: {self.config.restart_lr_mult}x {'🚀' if self.config.restart_lr_mult > 1.0 else ''}") print(f" Min LR: {self.config.min_lr}") if self.config.restart_lr_mult > 1.0: print(f"\n 🚀 BOOST MODE ENABLED!") print(f" Baseline LR: {self.config.learning_rate:.2e}") boosted_lrs = [self.config.learning_rate * (self.config.restart_lr_mult ** i) for i in range(1, min(4, len(self.restart_epochs) + 1))] for i, lr in enumerate(boosted_lrs): print(f" After restart #{i+1}: {lr:.2e} ({self.config.restart_lr_mult**(i+1):.2f}x)") print(f" → Creates wider exploration curves to escape local minima!") print(f"\n Restart schedule:") for i, epoch in enumerate(self.restart_epochs[:5]): # Show first 5 mult = self.config.restart_lr_mult ** (i + 1) if self.config.restart_lr_mult > 1.0 else 1.0 print(f" Restart #{i+1}: Epoch {epoch} (LR: {self.config.learning_rate * mult:.2e})") if len(self.restart_epochs) > 5: print(f" ... and {len(self.restart_epochs) - 5} more") return CosineAnnealingWarmRestartsWithBoost( self.optimizer, T_0=self.config.restart_period, T_mult=self.config.restart_mult, eta_min=self.config.min_lr, restart_lr_mult=self.config.restart_lr_mult ) elif self.config.scheduler_type == "multistep": print(f"\n[Scheduler] MultiStepLR") print(f" Milestones: {self.config.lr_milestones}") print(f" Gamma: {self.config.lr_gamma}") return torch.optim.lr_scheduler.MultiStepLR( self.optimizer, milestones=self.config.lr_milestones, gamma=self.config.lr_gamma ) elif self.config.scheduler_type == "cosine": print(f"\n[Scheduler] Cosine annealing with warmup") print(f" Warmup epochs: {self.config.warmup_epochs}") print(f" Min LR: {self.config.min_lr}") def lr_lambda(epoch): if epoch < self.config.warmup_epochs: return (epoch + 1) / self.config.warmup_epochs progress = (epoch - self.config.warmup_epochs) / (self.config.num_epochs - self.config.warmup_epochs) return 0.5 * (1 + math.cos(math.pi * progress)) return torch.optim.lr_scheduler.LambdaLR(self.optimizer, lr_lambda) else: raise ValueError(f"Unknown scheduler: {self.config.scheduler_type}") def print_model_info(self): """Print model info.""" total_params = sum(p.numel() for p in self.model.parameters()) print(f"\nParameters: {total_params:,}") print(f"Dataset: {self.config.dataset.upper()}") print(f"Classes: {self.config.num_classes}") print(f"Fusion mode: {self.config.fusion_mode}") print(f"Optimizer: {self.config.optimizer_type.upper()}") print(f"Scheduler: {self.config.scheduler_type}") if self.config.restart_lr_mult > 1.0: print(f"LR Boost: {self.config.restart_lr_mult}x at restarts 🚀") if self.config.use_mixing: print(f"Mixing: {self.config.mixing_type} (prob={self.config.mixing_prob})") print(f"Output: {self.config.output_dir}") def train_epoch(self, train_loader: DataLoader, epoch: int) -> Tuple[float, float]: """Train one epoch.""" self.model.train() total_loss, correct, total = 0.0, 0, 0 mixing_applied_count = 0 total_batches = 0 # Check if this is a restart epoch is_restart = (epoch in self.restart_epochs) epoch_desc = f"Epoch {epoch+1}/{self.config.num_epochs}" if is_restart: restart_num = self.restart_epochs.index(epoch) + 1 boost_mult = self.config.restart_lr_mult ** restart_num if self.config.restart_lr_mult > 1.0 else 1.0 epoch_desc += f" 🔄 RESTART #{restart_num}" if self.config.restart_lr_mult > 1.0: epoch_desc += f" ({boost_mult:.2f}x)" pbar = tqdm(train_loader, desc=f"{epoch_desc} [Train]") for batch_idx, (images, labels) in enumerate(pbar): images, labels = images.to(self.device, non_blocking=True), labels.to(self.device, non_blocking=True) # Apply mixing augmentation original_labels = labels mixed_images, mixed_labels_info, mixing_alpha = self.apply_mixing(images, labels) if mixing_alpha is not None: mixing_applied_count += 1 images = mixed_images total_batches += 1 # Forward if self.use_amp: with autocast(): logits = self.model(images) # Compute loss (handle mixed labels) if mixing_alpha is not None: loss = self.compute_mixed_loss(logits, mixed_labels_info) else: loss = self.criterion(logits, labels) self.optimizer.zero_grad(set_to_none=True) self.scaler.scale(loss).backward() self.scaler.unscale_(self.optimizer) torch.nn.utils.clip_grad_norm_(self.model.parameters(), self.config.grad_clip) self.scaler.step(self.optimizer) self.scaler.update() else: logits = self.model(images) # Compute loss (handle mixed labels) if mixing_alpha is not None: loss = self.compute_mixed_loss(logits, mixed_labels_info) else: loss = self.criterion(logits, labels) self.optimizer.zero_grad(set_to_none=True) loss.backward() torch.nn.utils.clip_grad_norm_(self.model.parameters(), self.config.grad_clip) self.optimizer.step() # Metrics (use original labels for accuracy) total_loss += loss.item() _, predicted = logits.max(1) correct += predicted.eq(original_labels).sum().item() total += original_labels.size(0) # TensorBoard logging if batch_idx % self.config.log_interval == 0: current_lr = self.scheduler.get_last_lr()[0] self.writer.add_scalar('train/loss', loss.item(), self.global_step) self.writer.add_scalar('train/accuracy', 100. * correct / total, self.global_step) self.writer.add_scalar('train/learning_rate', current_lr, self.global_step) if mixing_alpha is not None: self.writer.add_scalar('train/mixing_alpha', mixing_alpha, self.global_step) self.global_step += 1 postfix_dict = { 'loss': f'{loss.item():.4f}', 'acc': f'{100. * correct / total:.2f}%', 'lr': f'{self.scheduler.get_last_lr()[0]:.6f}' } if self.use_mixing: mix_pct = 100.0 * mixing_applied_count / total_batches postfix_dict['mix'] = f'{mix_pct:.0f}%' pbar.set_postfix(postfix_dict) return total_loss / len(train_loader), 100. * correct / total @torch.no_grad() def evaluate(self, val_loader: DataLoader, epoch: int) -> Tuple[float, Dict]: """Evaluate.""" self.model.eval() total_loss, correct, total = 0.0, 0, 0 consciousness_values = [] pbar = tqdm(val_loader, desc=f"Epoch {epoch+1}/{self.config.num_epochs} [Val] ") for batch_idx, (images, labels) in enumerate(pbar): images, labels = images.to(self.device, non_blocking=True), labels.to(self.device, non_blocking=True) # Forward with fusion info on last batch return_info = (batch_idx == len(val_loader) - 1) if self.use_amp: with autocast(): if return_info: logits, fusion_infos = self.model(images, return_fusion_info=True) if fusion_infos and fusion_infos[0].get('consciousness') is not None: consciousness_values.append(fusion_infos[0]['consciousness'].mean().item()) else: logits = self.model(images) loss = self.criterion(logits, labels) else: if return_info: logits, fusion_infos = self.model(images, return_fusion_info=True) if fusion_infos and fusion_infos[0].get('consciousness') is not None: consciousness_values.append(fusion_infos[0]['consciousness'].mean().item()) else: logits = self.model(images) loss = self.criterion(logits, labels) total_loss += loss.item() _, predicted = logits.max(1) correct += predicted.eq(labels).sum().item() total += labels.size(0) pbar.set_postfix({ 'loss': f'{total_loss / (batch_idx + 1):.4f}', 'acc': f'{100. * correct / total:.2f}%' }) avg_loss = total_loss / len(val_loader) accuracy = 100. * correct / total # TensorBoard logging self.writer.add_scalar('val/loss', avg_loss, epoch) self.writer.add_scalar('val/accuracy', accuracy, epoch) if consciousness_values: self.writer.add_scalar('val/consciousness', sum(consciousness_values) / len(consciousness_values), epoch) metrics = { 'loss': avg_loss, 'accuracy': accuracy, 'consciousness': sum(consciousness_values) / len(consciousness_values) if consciousness_values else None } return accuracy, metrics def train(self, train_loader: DataLoader, val_loader: DataLoader): """Full training loop.""" print("\n" + "=" * 70) print("Starting training with AdamW + Warm Restarts" + (" + LR Boost 🚀" if self.config.restart_lr_mult > 1.0 else "")) print(f"Optimizer: {self.config.optimizer_type.upper()}") print(f"Scheduler: {self.config.scheduler_type}") print(f"Restart period: {self.config.restart_period} epochs (T_0)") print(f"Cycle multiplier: {self.config.restart_mult}x (T_mult)") if self.config.restart_lr_mult > 1.0: print(f"LR boost multiplier: {self.config.restart_lr_mult}x 🚀") print(f"Total restarts: {len(self.restart_epochs)}") print("=" * 70 + "\n") for epoch in range(self.config.num_epochs): # Train train_loss, train_acc = self.train_epoch(train_loader, epoch) # Evaluate val_acc, val_metrics = self.evaluate(val_loader, epoch) # Update scheduler self.scheduler.step() # Check if this is a restart epoch or next epoch is a restart is_restart = (epoch in self.restart_epochs) next_is_restart = ((epoch + 1) in self.restart_epochs) next_lr = self.scheduler.get_last_lr()[0] # Print summary print(f"\n{'='*70}") print(f"Epoch [{epoch + 1}/{self.config.num_epochs}] Summary:") print(f" Train: Loss={train_loss:.4f}, Acc={train_acc:.2f}%") print(f" Val: Loss={val_metrics['loss']:.4f}, Acc={val_acc:.2f}%") if val_metrics['consciousness']: print(f" Consciousness: {val_metrics['consciousness']:.4f}") if next_is_restart: restart_num = self.restart_epochs.index(epoch + 1) + 1 boost_mult = self.config.restart_lr_mult ** restart_num if self.config.restart_lr_mult > 1.0 else 1.0 print(f" Next LR: {next_lr:.6f}") print(f" ⚠️ RESTART COMING! Next epoch will jump to {next_lr * self.config.restart_lr_mult:.6f}") if self.config.restart_lr_mult > 1.0: print(f" 🚀 Boosted exploration: {boost_mult:.2f}x baseline!") print(f" (Breaking out of solidified local minima)") elif is_restart: restart_num = self.restart_epochs.index(epoch) + 1 boost_mult = self.config.restart_lr_mult ** restart_num if self.config.restart_lr_mult > 1.0 else 1.0 print(f" 🔄 WARM RESTART #{restart_num}! Current LR: {next_lr:.6f}") if self.config.restart_lr_mult > 1.0: print(f" 🚀 Exploration boost: {boost_mult:.2f}x baseline") print(f" (Wider curve for aggressive exploration)") else: print(f" Current LR: {next_lr:.6f}") # Checkpoint logic is_best = val_acc > self.best_acc should_save_regular = ((epoch + 1) % self.config.save_interval == 0) should_upload_regular = ((epoch + 1) % self.config.checkpoint_upload_interval == 0) if is_best: self.best_acc = val_acc print(f" ✓ New best model! Accuracy: {val_acc:.2f}%") self.save_checkpoint(epoch, val_acc, prefix="best", upload=should_upload_regular) if should_save_regular: self.save_checkpoint(epoch, val_acc, prefix=f"epoch_{epoch+1}", upload=should_upload_regular) print(f" HF Uploads: {self.upload_count}") print(f"{'='*70}\n") # Flush TensorBoard if (epoch + 1) % 10 == 0: self.writer.flush() # Training complete training_time = (time.time() - self.start_time) / 3600 print("\n" + "=" * 70) print("Training Complete!") print(f"Best Validation Accuracy: {self.best_acc:.2f}%") print(f"Training Time: {training_time:.2f} hours") print(f"Total Uploads: {self.upload_count}") print(f"Warm Restarts: {len(self.restart_epochs)}") if self.config.restart_lr_mult > 1.0: print(f"LR Boost: {self.config.restart_lr_mult}x (helped escape local minima! 🚀)") print("=" * 70) # Upload to HuggingFace if self.hf_uploader: print("\n[HF] Uploading final best model...") best_model_path = self.config.checkpoint_dir / "best_model.safetensors" best_state_path = self.config.checkpoint_dir / "best_training_state.pt" best_metadata_path = self.config.checkpoint_dir / "best_metadata.json" config_path = self.config.output_dir / "config.yaml" if best_model_path.exists(): self.hf_uploader.upload_file(best_model_path, "checkpoints/best_model.safetensors") if best_state_path.exists(): self.hf_uploader.upload_file(best_state_path, "checkpoints/best_training_state.pt") if best_metadata_path.exists(): self.hf_uploader.upload_file(best_metadata_path, "checkpoints/best_metadata.json") if config_path.exists(): self.hf_uploader.upload_file(config_path, "config.yaml") print("[HF] Final upload: TensorBoard logs...") self.hf_uploader.upload_folder_contents(self.config.tensorboard_dir, "tensorboard") trainer_stats = { 'total_params': sum(p.numel() for p in self.model.parameters()), 'best_acc': self.best_acc, 'training_time': training_time, 'final_epoch': self.config.num_epochs, 'batch_size': self.config.batch_size, 'mixed_precision': self.use_amp } self.hf_uploader.create_model_card(trainer_stats) self.writer.close() def save_checkpoint(self, epoch: int, accuracy: float, prefix: str = "checkpoint", upload: bool = False): """Save checkpoint as safetensors with selective upload.""" checkpoint_dir = self.config.checkpoint_dir checkpoint_dir.mkdir(parents=True, exist_ok=True) # 1. Save model weights as safetensors model_path = checkpoint_dir / f"{prefix}_model.safetensors" save_file(self.model.state_dict(), str(model_path)) # 2. Save optimizer/scheduler state training_state = { 'optimizer_state_dict': self.optimizer.state_dict(), 'scheduler_state_dict': self.scheduler.state_dict(), } if self.scaler is not None: training_state['scaler_state_dict'] = self.scaler.state_dict() training_state_path = checkpoint_dir / f"{prefix}_training_state.pt" torch.save(training_state, training_state_path) # 3. Save metadata metadata = { 'epoch': epoch, 'accuracy': accuracy, 'best_accuracy': self.best_acc, 'global_step': self.global_step, 'timestamp': time.strftime("%Y-%m-%d %H:%M:%S"), 'optimizer': self.config.optimizer_type, 'scheduler': self.config.scheduler_type, 'learning_rate': self.scheduler.get_last_lr()[0], 'restart_lr_mult': self.config.restart_lr_mult } metadata_path = checkpoint_dir / f"{prefix}_metadata.json" with open(metadata_path, 'w') as f: json.dump(metadata, f, indent=2) is_best = (prefix == "best") if is_best: print(f" 💾 Saved best: {prefix}_model.safetensors") else: print(f" 💾 Saved: {prefix}_model.safetensors", end="") # Upload to HuggingFace if self.hf_uploader and upload: self.hf_uploader.upload_file(model_path, f"checkpoints/{prefix}_model.safetensors") self.hf_uploader.upload_file(training_state_path, f"checkpoints/{prefix}_training_state.pt") self.hf_uploader.upload_file(metadata_path, f"checkpoints/{prefix}_metadata.json") if is_best: config_path = self.config.output_dir / "config.yaml" if config_path.exists(): self.hf_uploader.upload_file(config_path, "config.yaml") self.upload_count += 1 if not is_best: print(" → Uploaded to HF") else: if not is_best: print(" (local only)") # ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ # Data Loading (with Cutout) # ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ class Cutout: """Cutout data augmentation.""" def __init__(self, length: int): self.length = length def __call__(self, img): h, w = img.size(1), img.size(2) mask = torch.ones((h, w), dtype=torch.float32) y = torch.randint(h, (1,)).item() x = torch.randint(w, (1,)).item() y1 = max(0, y - self.length // 2) y2 = min(h, y + self.length // 2) x1 = max(0, x - self.length // 2) x2 = min(w, x + self.length // 2) mask[y1:y2, x1:x2] = 0. mask = mask.expand_as(img) return img * mask def get_data_loaders(config: CantorTrainingConfig) -> Tuple[DataLoader, DataLoader]: """Create data loaders.""" # Normalization mean = (0.4914, 0.4822, 0.4465) std = (0.2470, 0.2435, 0.2616) # Augmentation if config.use_augmentation: transforms_list = [] if config.use_autoaugment: policy = transforms.AutoAugmentPolicy.CIFAR10 transforms_list.append(transforms.AutoAugment(policy)) else: transforms_list.extend([ transforms.RandomCrop(32, padding=4), transforms.RandomHorizontalFlip(), ]) transforms_list.append(transforms.ToTensor()) transforms_list.append(transforms.Normalize(mean, std)) if config.use_cutout: transforms_list.append(Cutout(config.cutout_length)) train_transform = transforms.Compose(transforms_list) else: train_transform = transforms.Compose([ transforms.ToTensor(), transforms.Normalize(mean, std) ]) val_transform = transforms.Compose([ transforms.ToTensor(), transforms.Normalize(mean, std) ]) # Dataset selection if config.dataset == "cifar10": train_dataset = datasets.CIFAR10(root='./data', train=True, download=True, transform=train_transform) val_dataset = datasets.CIFAR10(root='./data', train=False, download=True, transform=val_transform) elif config.dataset == "cifar100": train_dataset = datasets.CIFAR100(root='./data', train=True, download=True, transform=train_transform) val_dataset = datasets.CIFAR100(root='./data', train=False, download=True, transform=val_transform) else: raise ValueError(f"Unknown dataset: {config.dataset}") train_loader = DataLoader( train_dataset, batch_size=config.batch_size, shuffle=True, num_workers=config.num_workers, pin_memory=(config.device == "cuda") ) val_loader = DataLoader( val_dataset, batch_size=config.batch_size, shuffle=False, num_workers=config.num_workers, pin_memory=(config.device == "cuda") ) return train_loader, val_loader # ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ # Main - AdamW + CosineAnnealingWarmRestarts + LR Boost # ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ def main(): """Main training function with AdamW + Warm Restarts + LR Boost.""" # ═══════════════════════════════════════════════════════════════════ # Configuration - AdamW with Cosine Annealing Warm Restarts + LR BOOST # ═══════════════════════════════════════════════════════════════════ config = CantorTrainingConfig( # Dataset dataset="cifar100", # Architecture embed_dim=512, num_fusion_blocks=12, num_heads=8, fusion_mode="consciousness", k_simplex=4, use_beatrix=True, fusion_window=32, # Optimizer: AdamW optimizer_type="adamw", learning_rate=1e-4, weight_decay=0.005, # Stronger regularization adamw_betas=(0.9, 0.999), # Scheduler: Cosine Annealing with Warm Restarts + LR BOOST scheduler_type="cosine_restarts", restart_period=40, restart_mult=1.5, # Consistent cycle growth restart_lr_mult=1.25, # 🚀 NEW! Boost LR at restarts min_lr=1e-7, # Training num_epochs=200, batch_size=256, grad_clip=1.0, label_smoothing=0.15, # Augmentation use_augmentation=True, use_autoaugment=True, use_cutout=True, cutout_length=16, # Mixing augmentation (AlphaMix) use_mixing=True, # Enable mixing mixing_type="alphamix", # "alphamix" or "fractal" mixing_alpha_range=(0.3, 0.7), mixing_spatial_ratio=0.25, mixing_prob=0.5, # Apply to 50% of batches # Regularization dropout=0.1, drop_path_rate=0.15, # System device="cuda", use_mixed_precision=False, # HuggingFace hf_username="AbstractPhil", upload_to_hf=True, checkpoint_upload_interval=25, ) print("=" * 70) print(f"Cantor Fusion Classifier - {config.dataset.upper()}") print("Training Strategy: AdamW + Cosine Annealing Warm Restarts") if config.restart_lr_mult > 1.0: print("🚀 WITH LR BOOST AT RESTARTS 🚀") print("=" * 70) print(f"\nConfiguration:") print(f" Dataset: {config.dataset}") print(f" Fusion mode: {config.fusion_mode}") print(f" Optimizer: AdamW") print(f" Scheduler: CosineAnnealingWarmRestarts") print(f" Initial LR: {config.learning_rate}") print(f" Min LR: {config.min_lr}") print(f" Restart period (T_0): {config.restart_period} epochs") print(f" Cycle multiplier (T_mult): {config.restart_mult}x") if config.restart_lr_mult > 1.0: print(f" 🚀 Restart LR mult: {config.restart_lr_mult}x (BOOST MODE!)") if config.use_mixing: print(f" 🎨 Mixing: {config.mixing_type} (prob={config.mixing_prob})") print(f" Total epochs: {config.num_epochs}") # Calculate restart schedule restarts = [] current = config.restart_period period = config.restart_period while current < config.num_epochs: restarts.append(current) period *= config.restart_mult current += period print(f"\n Restart schedule ({len(restarts)} restarts):") for i, epoch in enumerate(restarts[:5]): boost_mult = config.restart_lr_mult ** (i + 1) if config.restart_lr_mult > 1.0 else 1.0 lr = config.learning_rate * boost_mult boost_str = f" ({boost_mult:.2f}x 🚀)" if config.restart_lr_mult > 1.0 else "" print(f" Restart #{i+1}: Epoch {epoch} → LR: {lr:.2e}{boost_str}") if len(restarts) > 5: print(f" ... and {len(restarts) - 5} more") print(f"\n Output: {config.output_dir}") print(f" HuggingFace: {'Enabled' if config.upload_to_hf else 'Disabled'}") if config.upload_to_hf: print(f" Repo: {config.hf_username}/{config.hf_repo_name}") print(f" Run: {config.run_name}") if config.restart_lr_mult > 1.0: print("\n" + "=" * 70) print("🚀 LR BOOST MODE - Expected Training Behavior:") print("=" * 70) print(f"📉 Cycle 1 (epochs 0-{config.restart_period}):") print(f" LR: {config.learning_rate:.2e} → {config.min_lr:.2e} (smooth drop)") print(" Expected: Convergence to local minimum") print("") print(f"🔄 Epoch {config.restart_period}: RESTART WITH BOOST!") boosted_lr = config.learning_rate * config.restart_lr_mult print(f" LR: {config.min_lr:.2e} → {boosted_lr:.2e} ({config.restart_lr_mult}x BOOST!)") print(" Expected: AGGRESSIVE exploration, escape local minimum") print(f" Benefit: Wider curve ({(config.restart_lr_mult - 1) * 100:.0f}% more exploration)") print("") print(f"📉 Cycle 2 (epochs {config.restart_period}-{int(config.restart_period * (1 + config.restart_mult))}):") print(f" LR: {boosted_lr:.2e} → {config.min_lr:.2e} (longer cycle)") print(" Expected: Deeper convergence from better starting point") print("") print(f"🔄 Epoch {int(config.restart_period * (1 + config.restart_mult))}: EVEN BIGGER BOOST!") boosted_lr2 = config.learning_rate * (config.restart_lr_mult ** 2) print(f" LR: {config.min_lr:.2e} → {boosted_lr2:.2e} ({config.restart_lr_mult**2:.2f}x!)") print(" Expected: VERY aggressive exploration") print("") print("🎯 Benefits:") print(" - Escape solidified local minima with LR spikes") print(" - Each restart explores WIDER than baseline") print(" - Progressive boost helps late-training plateaus") print(" - Automatic fracturing of failure modes") print("=" * 70) # Load data print("\nLoading data...") train_loader, val_loader = get_data_loaders(config) print(f" Train: {len(train_loader.dataset)} samples") print(f" Val: {len(val_loader.dataset)} samples") # Train trainer = Trainer(config) trainer.train(train_loader, val_loader) print("\n" + "=" * 70) print("🎯 Training complete!") if config.restart_lr_mult > 1.0: print(" Check TensorBoard to see the BOOSTED warm restart cycles!") else: print(" Check TensorBoard to see the warm restart cycles!") print(f" tensorboard --logdir {config.tensorboard_dir}") print("") print(" Look for:") print(" - Smooth LR drops during each cycle") if config.restart_lr_mult > 1.0: print(" - 🚀 BOOSTED LR jumps at restart epochs") print(" - Wider exploration curves after restarts") else: print(" - Sharp LR jumps at restart epochs") print(" - Accuracy improvements across cycles") print("=" * 70) if __name__ == "__main__": main()