#!/usr/bin/env python3 """ CogNet-1B — Lanceur d'entraînement Python pur =============================================== Remplace acil_submit.sh — tout est en Python ! Détecte les GPUs automatiquement, prépare les données, lance l'entraînement multi-GPU avec torchrun si nécessaire. Usage: # Simple — tout automatique python run.py # Avec options python run.py --max-steps 100000 --batch-size 4 --hf-token hf_xxxx # Reprendre un checkpoint python run.py --resume ./checkpoints_1b/cognet_1b_latest.pt # Seulement préparer les données python run.py --prep-only # Sur un cluster avec SLURM (soumission auto) python run.py --slurm --time 72:00:00 --gpus 4 """ import argparse import os import signal import subprocess import sys import time import json import shutil from datetime import datetime from pathlib import Path # ═══════════════════════════════════════════════════════════════════ # Configuration par défaut # ═══════════════════════════════════════════════════════════════════ DEFAULTS = { 'model_size': '1b', 'batch_size': 4, 'grad_accum': 8, 'seq_len': 512, 'max_lr': 1e-4, 'min_lr': 1e-5, 'warmup_steps': 2000, 'max_steps': 100000, 'ckpt_dir': './checkpoints_1b', 'data_dir': './data_1b', 'save_every': 2000, 'eval_every': 500, 'log_every': 50, 'weight_decay': 0.1, 'grad_clip': 1.0, } WORKSPACE = os.path.dirname(os.path.abspath(__file__)) TRAIN_SCRIPT = os.path.join(WORKSPACE, 'train_ultra.py') # ═══════════════════════════════════════════════════════════════════ # Détection GPU # ═══════════════════════════════════════════════════════════════════ def detect_gpus(): """Détecte le nombre de GPUs disponibles.""" try: result = subprocess.run( ['nvidia-smi', '--query-gpu=name,memory.total', '--format=csv,noheader,nounits'], capture_output=True, text=True, timeout=10 ) if result.returncode != 0: return 0, [] lines = [l.strip() for l in result.stdout.strip().split('\n') if l.strip()] gpus = [] for line in lines: parts = line.split(',') name = parts[0].strip() vram = float(parts[1].strip()) if len(parts) > 1 else 0 gpus.append({'name': name, 'vram_mb': vram}) return len(gpus), gpus except Exception: # Fallback: torch try: import torch count = torch.cuda.device_count() gpus = [] for i in range(count): name = torch.cuda.get_device_name(i) vram = torch.cuda.get_device_properties(i).total_mem / 1e6 # MB gpus.append({'name': name, 'vram_mb': vram}) return count, gpus except Exception: return 0, [] def get_gpu_type(gpus): """Retourne le type de GPU (A100, H100, etc.).""" if not gpus: return 'CPU' name = gpus[0]['name'].upper() if 'H100' in name: return 'H100' elif 'A100' in name: return 'A100' elif 'A6000' in name: return 'A6000' elif '4090' in name: return 'RTX4090' elif '3090' in name: return 'RTX3090' elif 'V100' in name: return 'V100' return gpus[0]['name'] # NOTE: Les estimations de temps seront calculées dynamiquement # par le vrai benchmark au début du training dans train_ultra.py. # Plus aucune estimation fabriquée ici. # ═══════════════════════════════════════════════════════════════════ # Préparation des données (Python) # ═══════════════════════════════════════════════════════════════════ def prepare_data_python(data_dir, hf_token='', skip=False): """Lance la préparation des données via train_ultra.py.""" if skip: print('[DATA] Skip (--skip-data-prep)') return True merged = os.path.join(data_dir, 'train_merged.pt') if os.path.exists(merged): size_mb = os.path.getsize(merged) / 1e6 print(f'[DATA] Déjà préparé: {merged} ({size_mb:.0f} MB)') return True print('[DATA] Préparation des datasets (HF + AICL + synthetic)...') env = os.environ.copy() if hf_token: env['HF_TOKEN'] = hf_token cmd = [sys.executable, TRAIN_SCRIPT, '--max-steps', '0', '--skip-data-prep'] # Note: --max-steps 0 avec --skip-data-prep ne fait rien # On doit lancer sans --skip-data-prep pour que la data prep se fasse cmd = [sys.executable, TRAIN_SCRIPT, '--max-steps', '0'] try: result = subprocess.run(cmd, env=env, cwd=WORKSPACE, timeout=7200) # 2h max if result.returncode != 0: print(f'[DATA] ERREUR: data prep a échoué (code {result.returncode})') return False except subprocess.TimeoutExpired: print('[DATA] ERREUR: data prep a timeout (2h)') return False except Exception as e: print(f'[DATA] ERREUR: {e}') return False if os.path.exists(merged): size_mb = os.path.getsize(merged) / 1e6 print(f'[DATA] Préparation terminée: {merged} ({size_mb:.0f} MB)') return True print('[DATA] ERREUR: fichier merged non trouvé après préparation') return False # ═══════════════════════════════════════════════════════════════════ # Vérification des dépendances # ═══════════════════════════════════════════════════════════════════ def check_dependencies(): """Vérifie que les dépendances Python sont installées.""" required = ['torch', 'datasets', 'huggingface_hub', 'tokenizers'] missing = [] for pkg in required: try: __import__(pkg) except ImportError: missing.append(pkg) # Vérification optionnelle optional_missing = [] try: import bitsandbytes except ImportError: optional_missing.append('bitsandbytes (optionnel: 8-bit optimizer)') return missing, optional_missing def install_dependencies(packages): """Installe les packages manquants.""" for pkg in packages: print(f'[INSTALL] Installation de {pkg}...') subprocess.run([sys.executable, '-m', 'pip', 'install', pkg, '-q'], check=False) # ═══════════════════════════════════════════════════════════════════ # Lancement de l'entraînement # ═══════════════════════════════════════════════════════════════════ def launch_training(args, num_gpus): """Lance l'entraînement — torchrun si multi-GPU, sinon python direct.""" # Construction des arguments communs common_args = [ '--model-size', str(args.model_size), '--batch-size', str(args.batch_size), '--grad-accum', str(args.grad_accum), '--seq-len', str(args.seq_len), '--max-lr', str(args.max_lr), '--min-lr', str(args.min_lr), '--warmup-steps', str(args.warmup_steps), '--max-steps', str(args.max_steps), '--ckpt-dir', str(args.ckpt_dir), '--save-every', str(args.save_every), '--eval-every', str(args.eval_every), '--log-every', str(args.log_every), '--weight-decay', str(args.weight_decay), '--grad-clip', str(args.grad_clip), ] # Optimisations V2 — toutes activées par défaut if args.bf16: common_args.append('--bf16') if args.compile: common_args.append('--compile') if args.cuda_prefetch: common_args.append('--cuda-prefetch') if args.seq_warmup: common_args.append('--seq-warmup') if args.async_ckpt: common_args.append('--async-ckpt') if args.use_8bit: common_args.append('--8bit-optim') # Resume if args.resume: common_args.extend(['--resume', args.resume]) # Skip data prep (déjà fait) common_args.append('--skip-data-prep') # Environnement env = os.environ.copy() if args.hf_token: env['HF_TOKEN'] = args.hf_token env['COGNET_WORKSPACE'] = WORKSPACE env['AICL_REPEAT'] = str(args.aicl_repeat) # CUDA optimizations env['CUDA_DEVICE_MAX_CONNECTIONS'] = '1' env['TORCH_NCCL_AVOID_RECORD_STREAMS'] = '1' if 'NCCL_P2P_LEVEL' not in env: env['NCCL_P2P_LEVEL'] = 'NVL' # Multi-GPU → torchrun if num_gpus > 1 and args.use_fsdp: common_args.append('--use-fsdp') cmd = [ sys.executable, '-m', 'torch.distributed.run', '--standalone', f'--nproc_per_node={num_gpus}', TRAIN_SCRIPT, ] + common_args print(f'\n[TRAIN] Lancement FSDP avec {num_gpus} GPUs via torchrun...') print(f'[TRAIN] Commande: {" ".join(cmd[:8])}... ({" ".join(common_args[:6])}...)') # Single GPU → python direct else: if args.compile_step: common_args.append('--compile-step') cmd = [sys.executable, TRAIN_SCRIPT] + common_args print(f'\n[TRAIN] Lancement single GPU...') print(f'[TRAIN] Commande: {" ".join(cmd[:4])}... ({" ".join(common_args[:6])}...)') # Lancement start_time = time.time() try: process = subprocess.Popen( cmd, env=env, cwd=WORKSPACE, stdout=sys.stdout, stderr=sys.stderr, ) # Gestion des signaux pour propager au sous-processus def forward_signal(signum, frame): process.send_signal(signum) signal.signal(signal.SIGTERM, forward_signal) signal.signal(signal.SIGINT, forward_signal) # Attendre la fin return_code = process.wait() elapsed = time.time() - start_time if return_code == 0: print(f'\n[TRAIN] Entraînement terminé avec succès! ({elapsed/3600:.1f}h)') else: print(f'\n[TRAIN] Entraînement terminé avec code {return_code} ({elapsed/3600:.1f}h)') return return_code == 0 except KeyboardInterrupt: print('\n[TRAIN] Interruption clavier — checkpoint sauvegardé par train_ultra.py') return True except Exception as e: print(f'\n[TRAIN] ERREUR: {e}') return False # ═══════════════════════════════════════════════════════════════════ # Soumission SLURM (optionnel) # ═══════════════════════════════════════════════════════════════════ def submit_slurm(args, num_gpus): """Soumet le job via SLURM — mais le script reste en Python!""" slurm_script = f"""#!/bin/bash #SBATCH --job-name=cognet-1b #SBATCH --partition=gpu #SBATCH --nodes=1 #SBATCH --ntasks-per-node={num_gpus} #SBATCH --cpus-per-task=8 #SBATCH --mem=256G #SBATCH --gres=gpu:{num_gpus} #SBATCH --time={args.time} #SBATCH --output=logs/cognet-%j.out #SBATCH --error=logs/cognet-%j.err cd {WORKSPACE} {sys.executable} run.py {" ".join(get_run_args_for_slurm(args))} """ script_path = os.path.join(WORKSPACE, '_slurm_submit.sh') os.makedirs(os.path.join(WORKSPACE, 'logs'), exist_ok=True) with open(script_path, 'w') as f: f.write(slurm_script) print(f'[SLURM] Soumission du job...') result = subprocess.run(['sbatch', script_path], capture_output=True, text=True) if result.returncode == 0: job_id = result.stdout.strip().split()[-1] print(f'[SLURM] Job soumis: {job_id}') print(f'[SLURM] Logs: logs/cognet-{job_id}.out') else: print(f'[SLURM] ERREUR: {result.stderr}') os.remove(script_path) def get_run_args_for_slurm(args): """Retourne les arguments Python pour la soumission SLURM.""" arg_list = [] if args.hf_token: arg_list.extend(['--hf-token', args.hf_token]) arg_list.extend(['--max-steps', str(args.max_steps)]) arg_list.extend(['--batch-size', str(args.batch_size)]) arg_list.extend(['--grad-accum', str(args.grad_accum)]) arg_list.extend(['--seq-len', str(args.seq_len)]) if args.no_compile: arg_list.append('--no-compile') if args.no_fsdp: arg_list.append('--no-fsdp') return arg_list # ═══════════════════════════════════════════════════════════════════ # Vérification des checkpoints # ═══════════════════════════════════════════════════════════════════ def check_existing_checkpoints(ckpt_dir): """Affiche les checkpoints existants.""" ckpt_path = Path(ckpt_dir) if not ckpt_path.exists(): return None latest = ckpt_path / 'cognet_1b_latest.pt' best = ckpt_path / 'cognet_1b_best.pt' final = ckpt_path / 'cognet_1b_final.pt' info = {} if latest.exists(): try: data = torch.load(str(latest), map_location='cpu', weights_only=False) info['latest_step'] = data.get('step', 0) info['latest_loss'] = data.get('loss', float('inf')) info['latest_path'] = str(latest) except Exception: pass if best.exists(): try: data = torch.load(str(best), map_location='cpu', weights_only=False) info['best_step'] = data.get('step', 0) info['best_loss'] = data.get('best_loss', float('inf')) info['best_path'] = str(best) except Exception: pass if final.exists(): info['final_path'] = str(final) return info # ═══════════════════════════════════════════════════════════════════ # Main # ═══════════════════════════════════════════════════════════════════ def main(): parser = argparse.ArgumentParser( description='CogNet-1B — Lanceur Python (remplace acil_submit.sh)', formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" Exemples: python run.py # Tout automatique python run.py --max-steps 50000 # 50k steps python run.py --hf-token hf_xxx # Avec token HF python run.py --resume ./checkpoints_1b/cognet_1b_latest.pt # Reprendre python run.py --prep-only # Seulement data prep python run.py --slurm --gpus 4 --time 72:00:00 # SLURM auto python run.py --no-fsdp # Single GPU """ ) # Config parser.add_argument('--model-size', type=str, default=DEFAULTS['model_size'], choices=['1b', '350m']) parser.add_argument('--batch-size', type=int, default=DEFAULTS['batch_size']) parser.add_argument('--grad-accum', type=int, default=DEFAULTS['grad_accum']) parser.add_argument('--seq-len', type=int, default=DEFAULTS['seq_len']) parser.add_argument('--max-lr', type=float, default=DEFAULTS['max_lr']) parser.add_argument('--min-lr', type=float, default=DEFAULTS['min_lr']) parser.add_argument('--warmup-steps', type=int, default=DEFAULTS['warmup_steps']) parser.add_argument('--max-steps', type=int, default=DEFAULTS['max_steps']) parser.add_argument('--ckpt-dir', type=str, default=DEFAULTS['ckpt_dir']) parser.add_argument('--data-dir', type=str, default=DEFAULTS['data_dir']) parser.add_argument('--save-every', type=int, default=DEFAULTS['save_every']) parser.add_argument('--eval-every', type=int, default=DEFAULTS['eval_every']) parser.add_argument('--log-every', type=int, default=DEFAULTS['log_every']) parser.add_argument('--weight-decay', type=float, default=DEFAULTS['weight_decay']) parser.add_argument('--grad-clip', type=float, default=DEFAULTS['grad_clip']) # Token & repos parser.add_argument('--hf-token', type=str, default=os.environ.get('HF_TOKEN', ''), help='HuggingFace API token') parser.add_argument('--aicl-repeat', type=int, default=10, help='Nombre de répétitions des données AICL') # Optimizations (activées par défaut) parser.add_argument('--no-compile', action='store_true', help='Désactiver torch.compile') parser.add_argument('--no-fsdp', action='store_true', help='Désactiver FSDP (single GPU)') parser.add_argument('--no-cuda-prefetch', action='store_true', help='Désactiver CUDA prefetch') parser.add_argument('--no-seq-warmup', action='store_true', help='Désactiver seq length warmup') parser.add_argument('--no-async-ckpt', action='store_true', help='Désactiver async checkpointing') parser.add_argument('--no-bf16', action='store_true', help='Désactiver BF16 (utiliser FP16)') parser.add_argument('--8bit', action='store_true', help='Activer 8-bit optimizer (bitsandbytes)') parser.add_argument('--compile-step', action='store_true', help='Compiler forward+backward ensemble') # Resume parser.add_argument('--resume', type=str, default=None, help='Chemin du checkpoint à reprendre') # Modes spéciaux parser.add_argument('--prep-only', action='store_true', help='Seulement préparer les données') parser.add_argument('--skip-data-prep', action='store_true', help='Sauter la préparation des données') parser.add_argument('--check-only', action='store_true', help='Seulement vérifier le setup') # SLURM parser.add_argument('--slurm', action='store_true', help='Soumettre via SLURM') parser.add_argument('--gpus', type=int, default=None, help='Nombre de GPUs pour SLURM') parser.add_argument('--time', type=str, default='72:00:00', help='Temps SLURM') args = parser.parse_args() # Dériver les flags booléens (inversés car les flags sont "no-*") args.bf16 = not args.no_bf16 args.compile = not args.no_compile args.use_fsdp = not args.no_fsdp args.cuda_prefetch = not args.no_cuda_prefetch args.seq_warmup = not args.no_seq_warmup args.async_ckpt = not args.no_async_ckpt args.use_8bit = getattr(args, '8bit', False) # ═══ Bannière ═══ print() print('╔══════════════════════════════════════════════════════════╗') print('║ CogNet-1B — Lanceur Python V2 ║') print('║ Les performances seront mesurées par benchmark ║') print('╚══════════════════════════════════════════════════════════╝') print() # ═══ Détection GPU ═══ num_gpus, gpus = detect_gpus() gpu_type = get_gpu_type(gpus) print(f'[GPU] {num_gpus} GPU(s) détecté(s):') for i, gpu in enumerate(gpus): print(f' GPU {i}: {gpu["name"]} ({gpu["vram_mb"]:.0f} MB VRAM)') print(f' Type: {gpu_type}') if num_gpus == 0: print('[GPU] ATTENTION: Aucun GPU détecté — entraînement sur CPU (très lent!)') print('[GPU] Vérifiez que nvidia-smi fonctionne et que CUDA est installé') # ═══ Vérification dépendances ═══ missing, optional = check_dependencies() if missing: print(f'\n[DEPS] Packages manquants: {", ".join(missing)}') response = input('[DEPS] Installer automatiquement? (o/n) [o] ').strip().lower() if response in ('', 'o', 'oui', 'y', 'yes'): install_dependencies(missing) else: print('[DEPS] Installation annulée. Installez manuellement:') print(f' pip install {" ".join(missing)}') sys.exit(1) if optional: print(f'[DEPS] Optionnels non installés: {", ".join(optional)}') # ═══ Vérification du script d'entraînement ═══ if not os.path.exists(TRAIN_SCRIPT): print(f'[ERREUR] Script d\'entraînement introuvable: {TRAIN_SCRIPT}') sys.exit(1) if not os.path.exists(os.path.join(WORKSPACE, 'cognet_1b_optimized.py')): print(f'[ERREUR] Modèle optimisé introuvable: cognet_1b_optimized.py') sys.exit(1) # ═══ Checkpoints existants ═══ ckpt_info = check_existing_checkpoints(args.ckpt_dir) if ckpt_info: print(f'\n[CKPT] Checkpoints existants dans {args.ckpt_dir}:') if 'latest_step' in ckpt_info: print(f' Latest: step {ckpt_info["latest_step"]}, loss={ckpt_info["latest_loss"]:.4f}') if 'best_step' in ckpt_info: print(f' Best: step {ckpt_info["best_step"]}, loss={ckpt_info["best_loss"]:.4f}') else: print(f'\n[CKPT] Aucun checkpoint existant') # ═══ Estimation du temps ═══ # NOTE: Le vrai benchmark sera fait par train_ultra.py au début du training. # Pas d'estimation fabriquée ici — les chiffres réels seront mesurés. if num_gpus > 0 and not args.check_only: effective_batch = args.batch_size * args.grad_accum * num_gpus print(f'\n[BENCH] Les performances seront mesurées par un vrai benchmark au démarrage.') print(f' GPU: {num_gpus}x {gpu_type}') print(f' Batch effectif: {effective_batch} ({args.batch_size} x {args.grad_accum} x {num_gpus} GPUs)') print(f' Le temps restant sera calculé à partir de la vitesse mesurée.') # ═══ Config finale ═══ print(f'\n[CONFIG] Configuration finale:') print(f' Model: CogNet-{args.model_size.upper()} (16 blocks, 8 channels, 384 ch_dim, 8192 ff)') print(f' Vocab: 136 (CharTokenizer)') print(f' Seq len: {args.seq_len}') print(f' Batch: {args.batch_size} x grad_accum={args.grad_accum} x GPUs={num_gpus} = {args.batch_size * args.grad_accum * num_gpus}') print(f' LR: {args.min_lr} → {args.max_lr}') print(f' Steps: {args.max_steps:,}') print(f' HF token: {"SET" if args.hf_token else "NOT SET"}') print(f' BF16: {args.bf16}') print(f' Compile: {args.compile}') print(f' FSDP: {args.use_fsdp} ({num_gpus} GPUs)') print(f' Prefetch: {args.cuda_prefetch}') print(f' SeqWarm: {args.seq_warmup}') print(f' AsyncCkpt:{args.async_ckpt}') print(f' 8-bit: {args.use_8bit}') # ═══ Check-only ═══ if args.check_only: print('\n[CHECK] Vérification terminée — tout est prêt!') return # ═══ SLURM ═══ if args.slurm: gpu_count = args.gpus or num_gpus or 4 submit_slurm(args, gpu_count) return # ═══ Data prep ═══ if args.prep_only: ok = prepare_data_python(args.data_dir, args.hf_token, skip=False) print('\n[DATA] Préparation terminée!' if ok else '\n[DATA] ÉCHEC!') return if not args.skip_data_prep: ok = prepare_data_python(args.data_dir, args.hf_token) if not ok: print('[DATA] ÉCHEC de la préparation des données!') response = input('[DATA] Continuer quand même? (o/n) [n] ').strip().lower() if response not in ('o', 'oui', 'y', 'yes'): sys.exit(1) # ═══ Entraînement ═══ print('\n' + '=' * 60) print(' DÉMARRAGE DE L\'ENTRAÎNEMENT') print('=' * 60) print(f' Début: {datetime.now().strftime("%Y-%m-%d %H:%M:%S")}') print('=' * 60 + '\n') success = launch_training(args, num_gpus) print('\n' + '=' * 60) if success: print(' ENTRAÎNEMENT TERMINÉ AVEC SUCCÈS') else: print(' ENTRAÎNEMENT TERMINÉ AVEC ERREURS') print(f' Fin: {datetime.now().strftime("%Y-%m-%d %H:%M:%S")}') print('=' * 60) # Vérifier le résultat final ckpt_info = check_existing_checkpoints(args.ckpt_dir) if ckpt_info and 'best_path' in ckpt_info: print(f'\n Meilleur checkpoint: {ckpt_info["best_path"]}') if 'best_loss' in ckpt_info: print(f' Meilleure loss: {ckpt_info["best_loss"]:.4f}') if not success: sys.exit(1) if __name__ == '__main__': main()