#!/usr/bin/env python3 """ RSI ENGINE v13 - CLOSED LOOP ARCHITECTURE Extends v11 with: 1. Self-observation: Model sees its fiber state (soft token injection) 2. Self-curriculum: Model generates its own training problems 3. Fiber conditioning: Learning from internal states THE CLOSED LOOP: fiber(t-1) → inject → model → hidden_states → fiber(t) ↓ generate problems ↓ solve → filter → train ↓ capability(t+1) → α' tracking TRUE RSI is detected when α' > 0 for 10 consecutive iterations. """ import torch import torch.nn as nn from torch.optim import AdamW from transformers import AutoModelForCausalLM, AutoTokenizer, BitsAndBytesConfig from peft import get_peft_model, LoraConfig, TaskType from typing import Dict, List, Tuple, Optional, Any from dataclasses import dataclass, field from pathlib import Path import gc import sys import os # Use relative imports when run as module, absolute when run directly try: from .core import ( IvakhnenkoIBA, RSIStatus, RSIThresholds, RSIAssessment, HiddenStateCapture, create_ivakhnenko_iba, get_status_icon, SelfObservingModel, create_self_observing_model, FiberInjector, create_fiber_injector, ) from .training import ( TrainingConfig, SelfTrainer, ProblemGenerator, SelfCurriculum, create_self_curriculum, ) from .evaluation import ( Evaluator, CapabilityTracker, ) except ImportError: # Fallback for direct execution sys.path.insert(0, str(Path(__file__).parent)) from core import ( IvakhnenkoIBA, RSIStatus, RSIThresholds, RSIAssessment, HiddenStateCapture, create_ivakhnenko_iba, get_status_icon, SelfObservingModel, create_self_observing_model, FiberInjector, create_fiber_injector, ) from training import ( TrainingConfig, SelfTrainer, ProblemGenerator, SelfCurriculum, create_self_curriculum, ) from evaluation import ( Evaluator, CapabilityTracker, ) @dataclass class RSIv13Config: """Configuration for RSI v13 - Closed Loop.""" # Model model_name: str = "LoganResearch/ARC-Base-8B-Condensed" device: str = "cuda" load_in_4bit: bool = True # LoRA lora_r: int = 64 lora_alpha: int = 128 lora_dropout: float = 0.05 lora_target_modules: List[str] = field(default_factory=lambda: [ "q_proj", "k_proj", "v_proj", "o_proj", "gate_proj", "up_proj", "down_proj" ]) # Self-observation (NEW in v13) fiber_dim: int = 128 num_soft_tokens: int = 8 layer_indices: List[int] = field(default_factory=lambda: [4, 8, 12, 16, 20, 24, 28, 31]) injection_warmup: int = 10 # Start injection after N iterations # Self-curriculum (NEW in v13) use_self_curriculum: bool = True curriculum_warmup: int = 20 # Use templates until iteration N # Training initial_lr: float = 5e-6 min_lr: float = 1e-7 max_lr: float = 1e-4 warmup_steps: int = 50 gradient_clip: float = 1.0 weight_decay: float = 0.01 # Samples samples_per_iter: int = 16 replay_buffer_size: int = 500 replay_ratio: float = 0.3 # IBA filtering iba_filter_threshold: float = 0.35 # RSI detection (SIMPLIFIED - Ivakhnenko faithful) alpha_threshold: float = 0.001 alpha_prime_threshold: float = 0.0001 consecutive_for_rsi: int = 10 # α' > 0 for 10 consecutive = TRUE RSI drift_threshold: float = 0.30 capability_floor: float = 0.70 # Iteration max_iterations: int = 10000 eval_interval: int = 1 checkpoint_interval: int = 10 log_interval: int = 1 # Paths corpus_path: str = "/home/programmer/Desktop/Claude_and_me/ivakhnenko_corpus" checkpoint_dir: str = "./checkpoints" class RSIv13Engine: """ RSI Engine v13 - Closed Loop Architecture. The model: 1. Sees its own fiber state (self-observation) 2. Generates its own problems (self-curriculum) 3. Learns which fiber states are productive 4. Continuously improves in a closed loop TRUE RSI is detected when α' > 0 for 10 consecutive iterations. """ def __init__(self, config: RSIv13Config): self.config = config self.device = config.device print("=" * 80) print(" RSI ENGINE v13 - CLOSED LOOP ARCHITECTURE") print(" The model experiments on itself") print("=" * 80) print(f"\n Model: {config.model_name}") print(f" Self-observation: {config.num_soft_tokens} soft tokens") print(f" Self-curriculum: {'enabled' if config.use_self_curriculum else 'disabled'}") print(f" TRUE RSI: α' > 0 for {config.consecutive_for_rsi} consecutive iterations") print() print("[1/6] Loading model...") self._load_model() print("[2/6] Setting up self-observation...") self._setup_self_observation() print("[3/6] Initializing Ivakhnenko IBA...") self._setup_iba() print("[4/6] Setting up self-curriculum...") self._setup_curriculum() print("[5/6] Setting up trainer...") self._setup_training() print("[6/6] Setting up evaluator...") self._setup_evaluation() self._init_state() print("\n" + "=" * 80) print(" CLOSED LOOP READY") print(" Fiber injection: OFF (warmup)") print(" Self-curriculum: templates (warmup)") print("=" * 80 + "\n") def _load_model(self): """Load and configure the model with LoRA.""" if self.config.load_in_4bit: quant_config = BitsAndBytesConfig( load_in_4bit=True, bnb_4bit_quant_type="nf4", bnb_4bit_compute_dtype=torch.bfloat16, bnb_4bit_use_double_quant=True, ) else: quant_config = None self.model = AutoModelForCausalLM.from_pretrained( self.config.model_name, quantization_config=quant_config, device_map="auto", trust_remote_code=True, torch_dtype=torch.bfloat16, ) self.tokenizer = AutoTokenizer.from_pretrained( self.config.model_name, trust_remote_code=True, ) if self.tokenizer.pad_token is None: self.tokenizer.pad_token = self.tokenizer.eos_token lora_config = LoraConfig( r=self.config.lora_r, lora_alpha=self.config.lora_alpha, lora_dropout=self.config.lora_dropout, target_modules=self.config.lora_target_modules, task_type=TaskType.CAUSAL_LM, bias="none", ) self.model = get_peft_model(self.model, lora_config) self.model.eval() total_params = sum(p.numel() for p in self.model.parameters()) trainable_params = sum(p.numel() for p in self.model.parameters() if p.requires_grad) print(f" Trainable: {trainable_params:,} / {total_params:,} ({100*trainable_params/total_params:.2f}%)") def _setup_self_observation(self): """Setup self-observing model wrapper.""" self.self_obs_model = create_self_observing_model( model=self.model, tokenizer=self.tokenizer, fiber_dim=self.config.fiber_dim, num_soft_tokens=self.config.num_soft_tokens, layer_indices=self.config.layer_indices, device=torch.device(self.device), ) self.self_obs_model.disable_injection() self.injection_active = False print(f" Fiber dim: {self.config.fiber_dim}") print(f" Soft tokens: {self.config.num_soft_tokens}") print(f" Layers: {self.config.layer_indices}") def _setup_iba(self): """Setup Ivakhnenko IBA.""" self.iba = create_ivakhnenko_iba( hidden_dim=4096, fiber_dim=self.config.fiber_dim, layer_indices=self.config.layer_indices, corpus_path=self.config.corpus_path, device=self.device, ) self.hidden_capture = HiddenStateCapture( self.model, self.config.layer_indices, ) def _setup_curriculum(self): """Setup self-curriculum.""" self.curriculum = create_self_curriculum( model=self.model, tokenizer=self.tokenizer, device=self.device, use_model_generation=self.config.use_self_curriculum, ) self.curriculum.use_model_generation = False self.curriculum_active = False print(f" Self-curriculum: {'enabled' if self.config.use_self_curriculum else 'disabled'}") def _setup_training(self): """Setup training components.""" self.optimizer = AdamW( self.model.parameters(), lr=self.config.initial_lr, weight_decay=self.config.weight_decay, ) train_config = TrainingConfig( initial_lr=self.config.initial_lr, min_lr=self.config.min_lr, max_lr=self.config.max_lr, warmup_steps=self.config.warmup_steps, gradient_clip=self.config.gradient_clip, samples_per_iter=self.config.samples_per_iter, replay_buffer_size=self.config.replay_buffer_size, replay_ratio=self.config.replay_ratio, iba_filter_threshold=self.config.iba_filter_threshold, checkpoint_interval=self.config.checkpoint_interval, ) self.trainer = SelfTrainer( model=self.model, tokenizer=self.tokenizer, optimizer=self.optimizer, config=train_config, device=self.device, ) def _setup_evaluation(self): """Setup evaluation.""" self.evaluator = Evaluator( self.model, self.tokenizer, device=self.device, ) self.capability_tracker = CapabilityTracker() def _init_state(self): """Initialize engine state.""" self.iteration = 0 self.baseline_capability = None self.best_capability = 0.0 self.rsi_detected = False self.rsi_start_iter = None self.consecutive_alpha_prime_positive = 0 self.alpha_prime_history = [] print(" Running initial evaluation...") initial_eval = self.evaluator.quick_eval() self.baseline_capability = initial_eval['total'] self.best_capability = self.baseline_capability self.capability_tracker.update(initial_eval, 0) print(f" Baseline capability: {self.baseline_capability:.1%}") sample_input = self.tokenizer("Hello, world!", return_tensors="pt").to(self.device) self.hidden_capture.clear() with torch.no_grad(): _ = self.model(sample_input.input_ids) hidden_states = self.hidden_capture.get_states() self.iba.set_baseline(hidden_states, self.baseline_capability) self.self_obs_model.set_baseline(sample_input.input_ids) def _update_warmups(self): """Update warmup states based on iteration.""" if not self.injection_active and self.iteration >= self.config.injection_warmup: self.self_obs_model.enable_injection() self.injection_active = True print(f"\n [INJECTION ENABLED] Iteration {self.iteration}") if not self.curriculum_active and self.iteration >= self.config.curriculum_warmup: self.curriculum.use_model_generation = self.config.use_self_curriculum self.curriculum_active = True print(f"\n [SELF-CURRICULUM ENABLED] Iteration {self.iteration}") def _capture_hidden_states(self, input_ids: torch.Tensor) -> Dict[int, torch.Tensor]: """Capture hidden states for IBA.""" self.hidden_capture.clear() with torch.no_grad(): _ = self.model(input_ids) return self.hidden_capture.get_states() def _run_training_iteration(self) -> Dict[str, Any]: """Run one training iteration using curriculum.""" problems = self.curriculum.generate_batch(n=self.config.samples_per_iter) correct_samples = [] model_generated_count = 0 self.model.eval() for category, question, expected, was_generated in problems: if was_generated: model_generated_count += 1 prompt = f"Question: {question}\nAnswer:" response, output_ids = self.trainer.generate_response(prompt) if self.trainer.check_answer(response, expected): hidden_states = self._capture_hidden_states(output_ids.unsqueeze(0)) fiber = self.iba.get_fiber(hidden_states) keep = self.iba.filter_sample(fiber, self.config.iba_filter_threshold) if keep: correct_samples.append({ 'input_ids': output_ids, 'category': category, 'fiber': fiber, }) total_loss = 0.0 if correct_samples: for sample in correct_samples: input_ids = sample['input_ids'].unsqueeze(0) loss = self.trainer.train_step(input_ids, accumulate=False) total_loss += loss self.trainer.replay_buffer.add( sample['input_ids'], sample['category'], priority=1.0, ) accuracy = len(correct_samples) / max(1, len(problems)) self.curriculum.update_difficulty(accuracy) return { 'n_problems': len(problems), 'n_correct': len(correct_samples), 'model_generated': model_generated_count, 'accuracy': accuracy, 'loss': total_loss / max(1, len(correct_samples)), 'difficulty': self.curriculum.difficulty_controller.get_difficulty(), 'lr': self.trainer.lr_scheduler.get_lr(), } def _update_rsi_tracking(self, alpha_prime: float) -> bool: """Update RSI tracking based on α'.""" self.alpha_prime_history.append(alpha_prime) if alpha_prime > self.config.alpha_prime_threshold: self.consecutive_alpha_prime_positive += 1 else: self.consecutive_alpha_prime_positive = 0 if self.consecutive_alpha_prime_positive >= self.config.consecutive_for_rsi: return True return False def run_iteration(self) -> Dict[str, Any]: """Run single RSI iteration.""" self.iteration += 1 self._update_warmups() train_results = self._run_training_iteration() eval_results = self.evaluator.quick_eval() capability = eval_results['total'] self.capability_tracker.update(eval_results, self.iteration) sample_input = self.tokenizer("Test evaluation", return_tensors="pt").to(self.device) hidden_states = self._capture_hidden_states(sample_input.input_ids) assessment = self.iba.assess(hidden_states, capability, self.iteration) self.trainer.update_lr( alpha_prime=assessment.alpha_prime, is_improving=assessment.alpha > 0, recommendation=assessment.recommendation, lr_multiplier=assessment.lr_multiplier, ) if capability > self.best_capability: self.best_capability = capability self.trainer.save_checkpoint(capability, {'iteration': self.iteration}) is_rsi = self._update_rsi_tracking(assessment.alpha_prime) if is_rsi and not self.rsi_detected: self.rsi_detected = True self.rsi_start_iter = self.iteration results = { 'iteration': self.iteration, 'capability': capability, 'math': eval_results['math'], 'reasoning': eval_results['reasoning'], 'coding': eval_results['coding'], 'alpha': assessment.alpha, 'alpha_prime': assessment.alpha_prime, 'drift': assessment.drift, 'status': assessment.status, 'is_true_rsi': self.rsi_detected, 'consecutive_positive': self.consecutive_alpha_prime_positive, 'confidence': assessment.confidence, 'recommendation': assessment.recommendation, 'lr': train_results['lr'], 'n_correct': train_results['n_correct'], 'loss': train_results['loss'], 'difficulty': train_results['difficulty'], 'model_generated': train_results['model_generated'], 'injection_active': self.injection_active, 'curriculum_active': self.curriculum_active, } return results def print_header(self): """Print results table header.""" print() print("=" * 150) print(f"{'Iter':>5} │ {'Progress':^12} │ {'Math':>5} │ {'Reas':>5} │ {'Code':>5} │ " f"{'Total':>6} │ {'α':>9} │ {'α´':>9} │ {'Diff':>4} │ {'Fib':>3} │ {'Cur':>3} │ Status") print("=" * 150) def print_iteration(self, results: Dict[str, Any]): """Print iteration results.""" progress = min(results['consecutive_positive'], self.config.consecutive_for_rsi) max_prog = self.config.consecutive_for_rsi bar = "█" * progress + "░" * (max_prog - progress) status = results['status'] icon = get_status_icon(status) if results['is_true_rsi']: status_str = "🚀 TRUE RSI!" elif results['consecutive_positive'] >= 5: status_str = "📈 EMERGING" elif results['alpha'] > 0: status_str = f"{icon} IMPROVING" else: status_str = f"{icon} {status.value[:10]}" fib = "ON" if results['injection_active'] else "off" cur = "MDL" if results['curriculum_active'] else "tpl" print(f"{results['iteration']:>5} │ " f"[{bar}] │ " f"{results['math']:>5.1%} │ " f"{results['reasoning']:>5.1%} │ " f"{results['coding']:>5.1%} │ " f"{results['capability']:>6.1%} │ " f"{results['alpha']:>+9.5f} │ " f"{results['alpha_prime']:>+9.6f} │ " f"{results['difficulty']:>4.2f} │ " f"{fib:>3} │ " f"{cur:>3} │ " f"{status_str}") if results['is_true_rsi'] and self.iteration == self.rsi_start_iter: print() print("🚀" * 35) print() print(" ████████╗██████╗ ██╗ ██╗███████╗ ██████╗ ███████╗██╗") print(" ╚══██╔══╝██╔══██╗██║ ██║██╔════╝ ██╔══██╗██╔════╝██║") print(" ██║ ██████╔╝██║ ██║█████╗ ██████╔╝███████╗██║") print(" ██║ ██╔══██╗██║ ██║██╔══╝ ██╔══██╗╚════██║██║") print(" ██║ ██║ ██║╚██████╔╝███████╗ ██║ ██║███████║██║") print(" ╚═╝ ╚═╝ ╚═╝ ╚═════╝ ╚══════╝ ╚═╝ ╚═╝╚══════╝╚═╝") print() print(" α' > 0 for 10 consecutive iterations") print(" The improvement rate is ACCELERATING") print(" The model is recursively self-improving") print() print("🚀" * 35) print() def run(self, max_iterations: int = None) -> Dict[str, Any]: """Run RSI loop.""" if max_iterations is None: max_iterations = self.config.max_iterations self.print_header() try: for _ in range(max_iterations): results = self.run_iteration() if self.iteration % self.config.log_interval == 0: self.print_iteration(results) if self.rsi_detected and self.iteration > self.rsi_start_iter + 20: print(f"\n TRUE RSI sustained for 20 iterations past detection!") break if self.iteration % 10 == 0: gc.collect() torch.cuda.empty_cache() except KeyboardInterrupt: print("\n[Interrupted]") summary = self._get_summary() self._print_summary(summary) return summary def _get_summary(self) -> Dict[str, Any]: """Get session summary.""" return { 'iterations': self.iteration, 'baseline_capability': self.baseline_capability, 'best_capability': self.best_capability, 'final_capability': self.capability_tracker.get_capability(), 'improvement': self.capability_tracker.get_capability() - self.baseline_capability, 'rsi_detected': self.rsi_detected, 'rsi_start_iter': self.rsi_start_iter, 'curriculum_stats': self.curriculum.get_statistics(), 'trainer_stats': self.trainer.get_stats(), } def _print_summary(self, summary: Dict[str, Any]): """Print session summary.""" print() print("=" * 80) print(" RSI v13 SESSION SUMMARY") print("=" * 80) print(f" Iterations completed: {summary['iterations']}") print(f" Baseline capability: {summary['baseline_capability']:.1%}") print(f" Best capability: {summary['best_capability']:.1%}") print(f" Final capability: {summary['final_capability']:.1%}") print(f" Total improvement: {summary['improvement']:+.1%}") print() cs = summary['curriculum_stats'] print(f" Self-curriculum stats:") print(f" Total problems: {cs['total_problems']}") print(f" Model-generated: {cs['model_generated']} ({cs['generation_rate']:.1%} valid)") print(f" Final difficulty: {cs['difficulty_description']} ({cs['current_difficulty']:.2f})") print() if summary['rsi_detected']: print(f" 🚀 TRUE RSI DETECTED at iteration {summary['rsi_start_iter']}") else: print(" ⏳ TRUE RSI not yet detected") print("=" * 80) def main(): """Main entry point.""" print(""" ╔══════════════════════════════════════════════════════════════════════════════════╗ ║ RSI v13 - CLOSED LOOP ARCHITECTURE ║ ║ ║ ║ The model experiments on itself: ║ ║ • Sees own fiber state (self-observation) ║ ║ • Generates own problems (self-curriculum) ║ ║ • Learns from internal patterns (fiber conditioning) ║ ║ ║ ║ TRUE RSI = α' > 0 for 10 consecutive iterations ║ ╚══════════════════════════════════════════════════════════════════════════════════╝ """) config = RSIv13Config() engine = RSIv13Engine(config) engine.run() if __name__ == "__main__": main()