🧠 Full weight release: 9 probes × 3 architectures + production adapter + training code
297244f
verified
| #!/usr/bin/env python3 | |
| """ | |
| LOOP 4 → RSI CEILING BREAKTHROUGH TEST (MEMORY-OPTIMIZED) | |
| ========================================================== | |
| The critical experiment: Does tokenization co-evolution break the RSI ceiling? | |
| Key insight: Adaptation training must EXERCISE THE NEW TOKENS. | |
| We generate text dense with merged patterns, re-encode with new tokenizer, | |
| and train the model to predict the merged tokens. | |
| MEMORY OPTIMIZED: | |
| - 4-bit quantization (~5GB model) | |
| - Batch size 1 with gradient accumulation | |
| - Reduced sequence lengths | |
| - Aggressive memory cleanup | |
| Expected VRAM: ~12-14GB | |
| Expected runtime: 45-75 minutes | |
| Pipeline: | |
| 1. Load Loop 4 results (merge candidates) | |
| 2. Resize embeddings for new tokens | |
| 3. TARGETED fine-tuning on text dense with merged patterns | |
| 4. Run extended RSI (aim for >5 iterations) | |
| 5. Measure if ceiling has moved | |
| Author: Logan Napolitano | |
| Date: January 2026 | |
| """ | |
| import torch | |
| import torch.nn as nn | |
| import torch.nn.functional as F | |
| import numpy as np | |
| import json | |
| import os | |
| import gc | |
| import re | |
| from pathlib import Path | |
| from dataclasses import dataclass | |
| from typing import List, Dict, Tuple, Optional | |
| from transformers import ( | |
| AutoModelForCausalLM, | |
| AutoTokenizer, | |
| TrainingArguments, | |
| Trainer, | |
| DataCollatorForLanguageModeling, | |
| BitsAndBytesConfig, | |
| ) | |
| from datasets import Dataset | |
| from peft import LoraConfig, get_peft_model, TaskType, prepare_model_for_kbit_training | |
| from tqdm import tqdm | |
| import warnings | |
| warnings.filterwarnings("ignore") | |
| class BreakthroughConfig: | |
| model_path: str = "." | |
| output_dir: str = "loop4_breakthrough" | |
| device: str = "cuda" | |
| # Tokenizer modification | |
| loop4_results_path: str = "loop4_full_results/loop4_full_results.json" | |
| top_k_merges: int = 10 # Reduced | |
| # Adaptation fine-tuning (targeted) | |
| adaptation_samples: int = 150 # Reduced | |
| adaptation_steps: int = 100 # Reduced | |
| adaptation_lr: float = 2e-5 | |
| adaptation_batch_size: int = 1 # Reduced | |
| gradient_accumulation: int = 8 # Increased to compensate | |
| min_pattern_density: float = 0.08 | |
| # RSI settings | |
| max_rsi_iterations: int = 10 | |
| rsi_micro_steps: int = 15 # Reduced | |
| rsi_lr: float = 1e-5 | |
| quality_threshold: float = 0.92 | |
| # Evaluation | |
| eval_samples: int = 10 # Reduced | |
| class EmbeddingResizer: | |
| """Handles resizing model embeddings for new tokens.""" | |
| def __init__(self, model, tokenizer): | |
| self.model = model | |
| self.tokenizer = tokenizer | |
| self.original_vocab_size = len(tokenizer) | |
| self.original_tokenizer = AutoTokenizer.from_pretrained( | |
| tokenizer.name_or_path, | |
| local_files_only=True | |
| ) | |
| def add_tokens_and_resize(self, new_tokens: List[str]) -> Tuple[int, List[str]]: | |
| """ | |
| Add new tokens and resize embeddings. | |
| Returns: (num_added, list of added tokens) | |
| """ | |
| existing_vocab = set(self.tokenizer.get_vocab().keys()) | |
| tokens_to_add = [t for t in new_tokens if t not in existing_vocab] | |
| if not tokens_to_add: | |
| print("All tokens already in vocabulary") | |
| return 0, [] | |
| print(f"Adding {len(tokens_to_add)} new tokens...") | |
| for t in tokens_to_add: | |
| print(f" + '{repr(t)}'") | |
| num_added = self.tokenizer.add_tokens(tokens_to_add) | |
| print(f"Vocabulary: {self.original_vocab_size} → {len(self.tokenizer)}") | |
| self.model.resize_token_embeddings(len(self.tokenizer)) | |
| self._initialize_new_embeddings(tokens_to_add) | |
| return num_added, tokens_to_add | |
| def _initialize_new_embeddings(self, new_tokens: List[str]): | |
| """Initialize new embeddings as average of component tokens.""" | |
| embed_weight = self.model.get_input_embeddings().weight | |
| for token in new_tokens: | |
| new_id = self.tokenizer.convert_tokens_to_ids(token) | |
| # Get component IDs from original tokenizer | |
| component_ids = self.original_tokenizer.encode(token, add_special_tokens=False) | |
| if component_ids: | |
| with torch.no_grad(): | |
| component_embeds = embed_weight[component_ids] | |
| embed_weight[new_id] = component_embeds.mean(dim=0) | |
| print(f" '{token}' initialized from {len(component_ids)} components") | |
| class TargetedDataGenerator: | |
| """ | |
| Generates training data DENSE with the merged token patterns. | |
| This is the key insight: train on text that exercises the new vocabulary. | |
| """ | |
| def __init__(self, model, tokenizer, merged_tokens: List[str], config: BreakthroughConfig): | |
| self.model = model | |
| self.tokenizer = tokenizer | |
| self.merged_tokens = merged_tokens | |
| self.config = config | |
| # Build pattern matchers for each merged token | |
| self.patterns = [] | |
| for token in merged_tokens: | |
| # Escape special regex chars | |
| escaped = re.escape(token) | |
| self.patterns.append(escaped) | |
| # Combined pattern | |
| if self.patterns: | |
| self.combined_pattern = re.compile('|'.join(self.patterns)) | |
| else: | |
| self.combined_pattern = None | |
| def count_pattern_matches(self, text: str) -> int: | |
| """Count how many merged token patterns appear in text.""" | |
| if not self.combined_pattern: | |
| return 0 | |
| return len(self.combined_pattern.findall(text)) | |
| def compute_pattern_density(self, text: str) -> float: | |
| """Compute what fraction of text is covered by merged patterns.""" | |
| if not text: | |
| return 0.0 | |
| matches = self.count_pattern_matches(text) | |
| # Rough estimate: each match covers ~5 chars on average | |
| coverage = (matches * 5) / len(text) | |
| return min(coverage, 1.0) | |
| def generate_dense_text(self, n_samples: int) -> List[str]: | |
| """ | |
| Generate text that naturally contains many merged token patterns. | |
| Strategy: | |
| 1. Use prompts that encourage the patterns (sentence starters, connectives) | |
| 2. Filter for high pattern density | |
| 3. Augment with explicit pattern injection if needed | |
| """ | |
| print("Generating pattern-dense training data...") | |
| # Prompts designed to elicit the patterns (. The, , and, . In, . It, , the, etc.) | |
| dense_prompts = [ | |
| # These encourage ". The" pattern | |
| "The experiment showed remarkable results. The data clearly indicated", | |
| "She opened the door carefully. The room inside was dark. The air felt cold. The silence was complete.", | |
| "Scientists made a breakthrough. The discovery changes everything. The implications are vast.", | |
| # These encourage ", and" and ", the" patterns | |
| "The team worked hard, and the results showed improvement, and the project succeeded.", | |
| "He studied mathematics, physics, and chemistry, and he excelled in all subjects.", | |
| "We need food, water, and shelter, and the supplies must arrive soon.", | |
| # These encourage ". In" pattern | |
| "The city grew rapidly. In the downtown area, new buildings appeared. In the suburbs, families settled.", | |
| "Change happens slowly. In the beginning, few noticed. In time, everyone understood.", | |
| # These encourage ". It" pattern | |
| "The machine hummed quietly. It processed data continuously. It never stopped working.", | |
| "The algorithm converged. It found the optimal solution. It exceeded expectations.", | |
| # These encourage ". This" pattern | |
| "The theory was revolutionary. This changed how scientists thought. This led to new discoveries.", | |
| "The problem seemed impossible. This made the solution more remarkable. This proved the method worked.", | |
| # Dense combinations | |
| "The research began in January. The team collected data, and the analysis revealed patterns. In the first phase, the results were promising. It became clear that the hypothesis was correct. This validated the entire approach, and the project moved forward.", | |
| "The algorithm processes input. The output depends on parameters, and the system optimizes continuously. In each iteration, the model improves. It learns from errors. This creates a feedback loop, and the performance increases.", | |
| "The forest stretched endlessly. The trees stood tall, and the leaves rustled softly. In the clearing, sunlight streamed down. It illuminated the path. This was the way forward, and the journey continued.", | |
| ] | |
| # Additional templates that force patterns | |
| templates = [ | |
| "The {noun} was {adj}. The {noun2} seemed {adj2}. In the {place}, {event}. It was {description}. This meant {conclusion}, and the {outcome}.", | |
| "{Statement}. The evidence was clear. In retrospect, {reflection}. It all made sense. This {insight}, and {result}.", | |
| "The process began. The first step involved {action}. In this phase, {detail}. It required {requirement}. This ensured {guarantee}, and the {final}.", | |
| ] | |
| nouns = ["system", "approach", "method", "solution", "discovery", "pattern", "structure", "concept"] | |
| adjs = ["remarkable", "significant", "important", "crucial", "fundamental", "essential"] | |
| places = ["beginning", "end", "middle", "process", "analysis", "study"] | |
| texts = [] | |
| attempts = 0 | |
| max_attempts = n_samples * 10 | |
| # First, use the dense prompts and generate continuations | |
| for prompt in dense_prompts: | |
| if len(texts) >= n_samples: | |
| break | |
| try: | |
| inputs = self.tokenizer(prompt, return_tensors="pt").to(self.config.device) | |
| with torch.no_grad(): | |
| outputs = self.model.generate( | |
| inputs.input_ids, | |
| max_new_tokens=80, # Reduced | |
| temperature=0.8, | |
| do_sample=True, | |
| pad_token_id=self.tokenizer.eos_token_id, | |
| repetition_penalty=1.1, | |
| ) | |
| text = self.tokenizer.decode(outputs[0], skip_special_tokens=True) | |
| density = self.compute_pattern_density(text) | |
| if density >= self.config.min_pattern_density: | |
| texts.append(text) | |
| except Exception as e: | |
| continue | |
| # Memory cleanup | |
| torch.cuda.empty_cache() | |
| gc.collect() | |
| # Generate more with varied prompts | |
| while len(texts) < n_samples and attempts < max_attempts: | |
| attempts += 1 | |
| # Random dense prompt | |
| base = np.random.choice(dense_prompts) | |
| try: | |
| inputs = self.tokenizer(base, return_tensors="pt").to(self.config.device) | |
| with torch.no_grad(): | |
| outputs = self.model.generate( | |
| inputs.input_ids, | |
| max_new_tokens=60, # Reduced | |
| temperature=0.9, | |
| do_sample=True, | |
| top_p=0.95, | |
| pad_token_id=self.tokenizer.eos_token_id, | |
| repetition_penalty=1.1, | |
| ) | |
| text = self.tokenizer.decode(outputs[0], skip_special_tokens=True) | |
| density = self.compute_pattern_density(text) | |
| if density >= self.config.min_pattern_density * 0.8: # Slightly relaxed | |
| texts.append(text) | |
| except: | |
| continue | |
| if attempts % 50 == 0: | |
| print(f" Generated {len(texts)}/{n_samples} samples ({attempts} attempts)") | |
| # If we still need more, inject patterns into generated text | |
| if len(texts) < n_samples: | |
| print(f" Augmenting with pattern injection...") | |
| texts.extend(self._inject_patterns(n_samples - len(texts))) | |
| return texts[:n_samples] | |
| def _inject_patterns(self, n_samples: int) -> List[str]: | |
| """Inject merged patterns into template text.""" | |
| templates = [ | |
| "The {0} was significant. The {1} followed naturally. In the {2}, everything changed. It was clear that {3}. This meant {4}, and the outcome was {5}.", | |
| "Research showed {0}. The findings were {1}. In particular, the {2} demonstrated {3}. It proved that {4}. This {5}, and {6}.", | |
| "The system processed {0}. The algorithm computed {1}, and the results showed {2}. In each iteration, the {3} improved. It optimized {4}. This created {5}, and the {6}.", | |
| ] | |
| fillers = [ | |
| "the data", "the results", "the analysis", "the model", "the approach", | |
| "important", "significant", "remarkable", "essential", "fundamental", | |
| "process", "method", "system", "framework", "structure", | |
| "the hypothesis held", "the theory worked", "the method succeeded", | |
| "improvement", "progress", "advancement", "efficiency", "performance", | |
| "a breakthrough", "new understanding", "better outcomes", "clear benefits", | |
| "conclusion validated the approach", "study confirmed expectations", | |
| ] | |
| texts = [] | |
| for _ in range(n_samples): | |
| template = np.random.choice(templates) | |
| fills = np.random.choice(fillers, size=10, replace=True) | |
| try: | |
| text = template.format(*fills) | |
| texts.append(text) | |
| except: | |
| continue | |
| return texts | |
| def create_dataset(self, n_samples: int) -> Dataset: | |
| """Create dataset with pattern-dense text.""" | |
| texts = self.generate_dense_text(n_samples) | |
| # Report statistics | |
| total_patterns = sum(self.count_pattern_matches(t) for t in texts) | |
| avg_density = np.mean([self.compute_pattern_density(t) for t in texts]) | |
| print(f"\nDataset statistics:") | |
| print(f" Samples: {len(texts)}") | |
| print(f" Total pattern matches: {total_patterns}") | |
| print(f" Avg pattern density: {avg_density:.2%}") | |
| print(f" Patterns per sample: {total_patterns/len(texts):.1f}") | |
| return Dataset.from_dict({"text": texts}) | |
| class TargetedAdaptationTrainer: | |
| """ | |
| Fine-tuning that specifically teaches the model to USE the new tokens. | |
| """ | |
| def __init__(self, model, tokenizer, merged_tokens: List[str], config: BreakthroughConfig): | |
| self.model = model | |
| self.tokenizer = tokenizer | |
| self.merged_tokens = merged_tokens | |
| self.config = config | |
| def train(self) -> nn.Module: | |
| print("\n" + "="*60) | |
| print("TARGETED ADAPTATION TRAINING") | |
| print("="*60) | |
| print(f"Teaching model to use {len(self.merged_tokens)} new tokens") | |
| print(f"Merged tokens: {self.merged_tokens[:5]}...") | |
| # Generate pattern-dense data | |
| generator = TargetedDataGenerator( | |
| self.model, self.tokenizer, self.merged_tokens, self.config | |
| ) | |
| dataset = generator.create_dataset(self.config.adaptation_samples) | |
| # Tokenize - this is where merged tokens get used | |
| def tokenize_fn(examples): | |
| tokenized = self.tokenizer( | |
| examples["text"], | |
| truncation=True, | |
| max_length=128, # Reduced | |
| padding="max_length", | |
| ) | |
| return tokenized | |
| tokenized = dataset.map(tokenize_fn, batched=True, remove_columns=["text"]) | |
| # Verify merged tokens appear in tokenized data | |
| sample_ids = tokenized[0]["input_ids"] | |
| new_token_ids = set( | |
| self.tokenizer.convert_tokens_to_ids(t) for t in self.merged_tokens | |
| ) | |
| found = sum(1 for id in sample_ids if id in new_token_ids) | |
| print(f" New tokens in sample: {found}") | |
| # LoRA setup | |
| lora_config = LoraConfig( | |
| task_type=TaskType.CAUSAL_LM, | |
| r=16, # Reduced for memory | |
| lora_alpha=32, | |
| lora_dropout=0.05, | |
| target_modules=["q_proj", "v_proj", "k_proj", "o_proj"], | |
| ) | |
| # Prepare for quantized training | |
| self.model = prepare_model_for_kbit_training(self.model) | |
| peft_model = get_peft_model(self.model, lora_config) | |
| peft_model.print_trainable_parameters() | |
| # Training | |
| training_args = TrainingArguments( | |
| output_dir=f"{self.config.output_dir}/adaptation", | |
| max_steps=self.config.adaptation_steps, | |
| per_device_train_batch_size=self.config.adaptation_batch_size, | |
| gradient_accumulation_steps=self.config.gradient_accumulation, | |
| learning_rate=self.config.adaptation_lr, | |
| warmup_steps=15, | |
| logging_steps=10, | |
| save_strategy="no", | |
| fp16=True, | |
| report_to="none", | |
| dataloader_pin_memory=False, | |
| ) | |
| data_collator = DataCollatorForLanguageModeling( | |
| tokenizer=self.tokenizer, | |
| mlm=False, | |
| ) | |
| trainer = Trainer( | |
| model=peft_model, | |
| args=training_args, | |
| train_dataset=tokenized, | |
| data_collator=data_collator, | |
| ) | |
| print("\nTraining on pattern-dense data...") | |
| trainer.train() | |
| # Merge weights | |
| print("Merging LoRA weights...") | |
| merged_model = peft_model.merge_and_unload() | |
| # Quick verification | |
| print("\nVerifying adaptation...") | |
| self._verify_adaptation(merged_model) | |
| return merged_model | |
| def _verify_adaptation(self, model): | |
| """Verify the model uses new tokens correctly.""" | |
| test_prompt = "The research showed results. The" | |
| inputs = self.tokenizer(test_prompt, return_tensors="pt").to(self.config.device) | |
| with torch.no_grad(): | |
| outputs = model.generate( | |
| inputs.input_ids, | |
| max_new_tokens=30, | |
| temperature=0.7, | |
| do_sample=True, | |
| pad_token_id=self.tokenizer.eos_token_id, | |
| ) | |
| response = self.tokenizer.decode(outputs[0], skip_special_tokens=True) | |
| print(f" Test generation: '{response[:100]}...'") | |
| class ExtendedRSI: | |
| """Extended RSI to test ceiling breakthrough.""" | |
| def __init__(self, model, tokenizer, config: BreakthroughConfig): | |
| self.model = model | |
| self.tokenizer = tokenizer | |
| self.config = config | |
| self.iteration_history = [] | |
| self.baseline_quality = None | |
| def evaluate_quality(self, n_samples: int = None) -> Dict: | |
| if n_samples is None: | |
| n_samples = self.config.eval_samples | |
| prompts = [ | |
| "Explain the concept of recursion in programming:", | |
| "What are the key principles of effective communication?", | |
| "Describe the process of photosynthesis:", | |
| "How do neural networks learn from data?", | |
| "What is the scientific method and why is it important?", | |
| "Explain the relationship between supply and demand:", | |
| "What are the main challenges in renewable energy?", | |
| "How does memory work in the human brain?", | |
| "Describe the structure of an atom:", | |
| "What factors influence climate patterns?", | |
| ] | |
| metrics = { | |
| "coherence": [], | |
| "completeness": [], | |
| "repetition_rate": [], | |
| "avg_length": [], | |
| } | |
| for prompt in prompts[:n_samples]: | |
| try: | |
| inputs = self.tokenizer(prompt, return_tensors="pt").to(self.config.device) | |
| with torch.no_grad(): | |
| outputs = self.model.generate( | |
| inputs.input_ids, | |
| max_new_tokens=100, | |
| temperature=0.7, | |
| do_sample=True, | |
| pad_token_id=self.tokenizer.eos_token_id, | |
| repetition_penalty=1.1, | |
| ) | |
| response = self.tokenizer.decode(outputs[0], skip_special_tokens=True) | |
| response = response[len(prompt):].strip() | |
| tokens = response.split() | |
| # Repetition | |
| if len(tokens) > 0: | |
| unique_ratio = len(set(tokens)) / len(tokens) | |
| metrics["repetition_rate"].append(1 - unique_ratio) | |
| else: | |
| metrics["repetition_rate"].append(1.0) | |
| metrics["avg_length"].append(len(tokens)) | |
| metrics["coherence"].append(1.0 if len(tokens) > 10 else 0.5) | |
| metrics["completeness"].append(1.0 if response and response[-1] in '.!?' else 0.7) | |
| except: | |
| continue | |
| result = { | |
| "coherence": np.mean(metrics["coherence"]) if metrics["coherence"] else 0, | |
| "completeness": np.mean(metrics["completeness"]) if metrics["completeness"] else 0, | |
| "repetition_rate": np.mean(metrics["repetition_rate"]) if metrics["repetition_rate"] else 1, | |
| "avg_length": np.mean(metrics["avg_length"]) if metrics["avg_length"] else 0, | |
| } | |
| result["quality_score"] = ( | |
| result["coherence"] * 0.3 + | |
| result["completeness"] * 0.3 + | |
| (1 - result["repetition_rate"]) * 0.4 | |
| ) | |
| return result | |
| def generate_rsi_data(self, n_samples: int = 30) -> Dataset: # Reduced default | |
| prompts = [ | |
| "Write a clear explanation of", | |
| "Describe in detail how", | |
| "Explain the relationship between", | |
| "What are the key aspects of", | |
| "Provide an analysis of", | |
| ] | |
| topics = [ | |
| "machine learning algorithms", "climate systems", "economic markets", | |
| "cognitive processes", "technological change", "scientific methods", | |
| "social structures", "mathematical proofs", "biological evolution", | |
| "energy systems", | |
| ] | |
| texts = [] | |
| for prompt in prompts: | |
| for topic in topics: | |
| full_prompt = f"{prompt} {topic}:" | |
| try: | |
| inputs = self.tokenizer(full_prompt, return_tensors="pt").to(self.config.device) | |
| with torch.no_grad(): | |
| outputs = self.model.generate( | |
| inputs.input_ids, | |
| max_new_tokens=60, # Reduced | |
| temperature=0.7, | |
| do_sample=True, | |
| pad_token_id=self.tokenizer.eos_token_id, | |
| repetition_penalty=1.1, | |
| ) | |
| text = self.tokenizer.decode(outputs[0], skip_special_tokens=True) | |
| texts.append(text) | |
| except: | |
| continue | |
| if len(texts) >= n_samples: | |
| break | |
| if len(texts) >= n_samples: | |
| break | |
| return Dataset.from_dict({"text": texts}) | |
| def micro_train(self, dataset: Dataset): | |
| def tokenize_fn(examples): | |
| return self.tokenizer( | |
| examples["text"], | |
| truncation=True, | |
| max_length=128, # Reduced | |
| padding="max_length", | |
| ) | |
| tokenized = dataset.map(tokenize_fn, batched=True, remove_columns=["text"]) | |
| lora_config = LoraConfig( | |
| task_type=TaskType.CAUSAL_LM, | |
| r=8, | |
| lora_alpha=16, | |
| lora_dropout=0.05, | |
| target_modules=["q_proj", "v_proj"], | |
| ) | |
| # Prepare model for quantized training | |
| model_for_training = prepare_model_for_kbit_training(self.model) | |
| peft_model = get_peft_model(model_for_training, lora_config) | |
| training_args = TrainingArguments( | |
| output_dir=f"{self.config.output_dir}/rsi_temp", | |
| max_steps=self.config.rsi_micro_steps, | |
| per_device_train_batch_size=1, # Reduced | |
| gradient_accumulation_steps=4, # Increased | |
| learning_rate=self.config.rsi_lr, | |
| warmup_steps=2, | |
| logging_steps=100, | |
| fp16=True, | |
| report_to="none", | |
| save_strategy="no", | |
| dataloader_pin_memory=False, | |
| ) | |
| data_collator = DataCollatorForLanguageModeling( | |
| tokenizer=self.tokenizer, | |
| mlm=False, | |
| ) | |
| trainer = Trainer( | |
| model=peft_model, | |
| args=training_args, | |
| train_dataset=tokenized, | |
| data_collator=data_collator, | |
| ) | |
| trainer.train() | |
| self.model = peft_model.merge_and_unload() | |
| def run(self) -> Dict: | |
| print("\n" + "="*60) | |
| print("EXTENDED RSI - CEILING BREAKTHROUGH TEST") | |
| print("="*60) | |
| print(f"Previous ceiling: 3-5 iterations") | |
| print(f"Target: >5 successful iterations") | |
| print(f"Max attempts: {self.config.max_rsi_iterations}") | |
| # Baseline | |
| print("\nEstablishing baseline...") | |
| self.baseline_quality = self.evaluate_quality() | |
| print(f"Baseline: {self.baseline_quality['quality_score']:.4f}") | |
| successful = 0 | |
| consecutive_failures = 0 | |
| # Store original model reference (quantized base - don't modify) | |
| # RSI works by: train LoRA -> evaluate -> if good, keep merged; if bad, skip merge | |
| for iteration in range(1, self.config.max_rsi_iterations + 1): | |
| print(f"\n--- RSI Iteration {iteration} ---") | |
| # Self-generate data BEFORE any training | |
| print(" Generating data...") | |
| rsi_data = self.generate_rsi_data(25) | |
| # Setup fresh LoRA for this iteration | |
| def tokenize_fn(examples): | |
| return self.tokenizer( | |
| examples["text"], | |
| truncation=True, | |
| max_length=128, | |
| padding="max_length", | |
| ) | |
| tokenized = rsi_data.map(tokenize_fn, batched=True, remove_columns=["text"]) | |
| lora_config = LoraConfig( | |
| task_type=TaskType.CAUSAL_LM, | |
| r=8, | |
| lora_alpha=16, | |
| lora_dropout=0.05, | |
| target_modules=["q_proj", "v_proj"], | |
| ) | |
| # Apply LoRA (don't prepare_for_kbit_training again if already done) | |
| try: | |
| peft_model = get_peft_model(self.model, lora_config) | |
| except: | |
| # Model might already be a PEFT model from previous iteration | |
| self.model = self.model.merge_and_unload() if hasattr(self.model, 'merge_and_unload') else self.model | |
| peft_model = get_peft_model(self.model, lora_config) | |
| training_args = TrainingArguments( | |
| output_dir=f"{self.config.output_dir}/rsi_temp", | |
| max_steps=self.config.rsi_micro_steps, | |
| per_device_train_batch_size=1, | |
| gradient_accumulation_steps=4, | |
| learning_rate=self.config.rsi_lr, | |
| warmup_steps=2, | |
| logging_steps=100, | |
| fp16=True, | |
| report_to="none", | |
| save_strategy="no", | |
| dataloader_pin_memory=False, | |
| ) | |
| data_collator = DataCollatorForLanguageModeling( | |
| tokenizer=self.tokenizer, | |
| mlm=False, | |
| ) | |
| print(" Training...") | |
| trainer = Trainer( | |
| model=peft_model, | |
| args=training_args, | |
| train_dataset=tokenized, | |
| data_collator=data_collator, | |
| ) | |
| trainer.train() | |
| # Merge to evaluate | |
| merged_model = peft_model.merge_and_unload() | |
| # Evaluate | |
| print(" Evaluating...") | |
| old_model = self.model | |
| self.model = merged_model | |
| quality = self.evaluate_quality() | |
| relative = quality["quality_score"] / self.baseline_quality["quality_score"] | |
| print(f" Quality: {quality['quality_score']:.4f} ({relative:.1%} of baseline)") | |
| if relative >= self.config.quality_threshold: | |
| successful += 1 | |
| consecutive_failures = 0 | |
| self.iteration_history.append({ | |
| "iteration": iteration, | |
| "status": "success", | |
| "quality": quality["quality_score"], | |
| "relative": relative, | |
| }) | |
| print(f" ✅ SUCCESS (total: {successful})") | |
| # Keep merged model | |
| if quality["quality_score"] > self.baseline_quality["quality_score"]: | |
| self.baseline_quality = quality | |
| else: | |
| consecutive_failures += 1 | |
| self.iteration_history.append({ | |
| "iteration": iteration, | |
| "status": "rollback", | |
| "quality": quality["quality_score"], | |
| "relative": relative, | |
| }) | |
| print(f" ⚠️ ROLLBACK (discarding LoRA)") | |
| # Don't keep the merged model, go back to old | |
| self.model = old_model | |
| del merged_model | |
| if consecutive_failures >= 3: | |
| print(f"\n🛑 CEILING HIT at iteration {iteration}") | |
| break | |
| torch.cuda.empty_cache() | |
| gc.collect() | |
| return { | |
| "successful_iterations": successful, | |
| "total_attempts": len(self.iteration_history), | |
| "ceiling_broken": successful > 5, | |
| "history": self.iteration_history, | |
| "final_quality": self.evaluate_quality(), | |
| } | |
| class BreakthroughExperiment: | |
| """Complete Loop 4 → RSI breakthrough experiment.""" | |
| def __init__(self, config: BreakthroughConfig): | |
| self.config = config | |
| self.results = {} | |
| def run(self): | |
| print("="*70) | |
| print("LOOP 4 → RSI CEILING BREAKTHROUGH TEST") | |
| print("="*70) | |
| print("\nIf RSI goes past 5 iterations, the ceiling is broken.\n") | |
| # Load Loop 4 results | |
| print("Step 1: Loading Loop 4 results...") | |
| loop4_path = Path(self.config.loop4_results_path) | |
| if not loop4_path.exists(): | |
| print(f"ERROR: Not found: {loop4_path}") | |
| return None | |
| with open(loop4_path) as f: | |
| loop4_results = json.load(f) | |
| # Extract merges | |
| merges = [] | |
| if "all_improvements" in loop4_results: | |
| for imp in loop4_results["all_improvements"][:self.config.top_k_merges]: | |
| merges.append(imp["merged"]) | |
| elif "top_stressed_pairs" in loop4_results: | |
| for pair in loop4_results["top_stressed_pairs"][:self.config.top_k_merges]: | |
| # Parse "'. ' + 'The'" format | |
| tokens = pair["tokens"].replace("'", "").split(" + ") | |
| if len(tokens) == 2: | |
| merges.append(tokens[0] + tokens[1]) | |
| print(f"Merge candidates: {merges}") | |
| # Load model with 4-bit quantization | |
| print("\nStep 2: Loading model (4-bit quantized)...") | |
| tokenizer = AutoTokenizer.from_pretrained( | |
| self.config.model_path, | |
| local_files_only=True | |
| ) | |
| tokenizer.pad_token = tokenizer.eos_token | |
| bnb_config = BitsAndBytesConfig( | |
| load_in_4bit=True, | |
| bnb_4bit_quant_type="nf4", | |
| bnb_4bit_compute_dtype=torch.bfloat16, | |
| bnb_4bit_use_double_quant=True, | |
| ) | |
| model = AutoModelForCausalLM.from_pretrained( | |
| self.config.model_path, | |
| quantization_config=bnb_config, | |
| device_map="auto", | |
| torch_dtype=torch.bfloat16, | |
| local_files_only=True, | |
| ) | |
| print(f"Loaded: {model.config.hidden_size}d, {model.config.num_hidden_layers}L") | |
| print(f"VRAM: {torch.cuda.memory_allocated()/1024**3:.1f}GB") | |
| # Resize embeddings | |
| print("\nStep 3: Resizing embeddings...") | |
| resizer = EmbeddingResizer(model, tokenizer) | |
| n_added, added_tokens = resizer.add_tokens_and_resize(merges) | |
| self.results["tokens_added"] = n_added | |
| # Targeted adaptation | |
| print("\nStep 4: Targeted adaptation training...") | |
| adapter = TargetedAdaptationTrainer(model, tokenizer, added_tokens, self.config) | |
| model = adapter.train() | |
| # Extended RSI | |
| print("\nStep 5: Extended RSI...") | |
| rsi = ExtendedRSI(model, tokenizer, self.config) | |
| rsi_results = rsi.run() | |
| self.results["rsi"] = rsi_results | |
| # Report | |
| print("\n" + "="*70) | |
| print("RESULTS") | |
| print("="*70) | |
| print(f"\nTokens added: {n_added}") | |
| print(f"RSI successful: {rsi_results['successful_iterations']}") | |
| print(f"RSI total: {rsi_results['total_attempts']}") | |
| if rsi_results["ceiling_broken"]: | |
| print("\n" + "🎯"*25) | |
| print(" THE CEILING IS BROKEN") | |
| print("🎯"*25) | |
| print(f"\nRSI: {rsi_results['successful_iterations']} iterations (>5)") | |
| print("Loop 4 tokenization co-evolution WORKS") | |
| print("The ladder extends.") | |
| else: | |
| print(f"\n⚠️ Ceiling at {rsi_results['successful_iterations']} iterations") | |
| if rsi_results['successful_iterations'] >= 5: | |
| print(" Matched previous ceiling - more refinement needed") | |
| else: | |
| print(" Below previous ceiling - investigate") | |
| # Save | |
| output_dir = Path(self.config.output_dir) | |
| output_dir.mkdir(exist_ok=True) | |
| save_data = { | |
| "tokens_added": n_added, | |
| "merges": merges, | |
| "rsi_successful": rsi_results["successful_iterations"], | |
| "ceiling_broken": rsi_results["ceiling_broken"], | |
| "history": rsi_results["history"], | |
| } | |
| with open(output_dir / "breakthrough_results.json", "w") as f: | |
| json.dump(save_data, f, indent=2) | |
| print(f"\nSaved to {output_dir}/breakthrough_results.json") | |
| return self.results | |
| def main(): | |
| config = BreakthroughConfig( | |
| model_path=".", | |
| loop4_results_path="loop4_full_results/loop4_full_results.json", | |
| top_k_merges=15, | |
| adaptation_samples=300, | |
| adaptation_steps=150, | |
| adaptation_lr=2e-5, | |
| max_rsi_iterations=10, | |
| rsi_micro_steps=20, | |
| quality_threshold=0.92, | |
| ) | |
| experiment = BreakthroughExperiment(config) | |
| return experiment.run() | |
| if __name__ == "__main__": | |
| main() | |