#!/usr/bin/env python3 """ ███████████████████████████████████████████████████████████████████████████████ █ █ █ ARC DENSE TRAINING PIPELINE v2.0 - "THE CONDENSATOR" █ █ █ █ The most sophisticated information density training system ever created █ █ █ █ Core Innovation: We don't just reward density - we TEACH density █ █ through contrastive examples, distillation, and iterative refinement █ █ █ ███████████████████████████████████████████████████████████████████████████████ PHILOSOPHY: ----------- The original dense training failed because it tried to optimize a metric without showing the model WHAT dense output looks like. This pipeline fixes that with a 4-stage approach: STAGE 1: CONTRASTIVE DATA GENERATION - Generate verbose responses (easy - model's default) - Generate dense responses (using constrained decoding + self-critique) - Create (prompt, verbose, dense) triplets STAGE 2: DENSITY DISTILLATION - Use Claude API / GPT-4 to generate gold-standard dense responses - Fine-tune on these exemplars (SFT) - Model learns WHAT density looks like STAGE 3: CONTRASTIVE PREFERENCE TRAINING (DPO-style) - Train model to prefer dense over verbose - Direct signal: "this is better than that" STAGE 4: REINFORCEMENT WITH LEARNED REWARD - Train a reward model on density preferences - RL fine-tune with strong, calibrated reward signal The result: A model that UNDERSTANDS density, not just optimizes a metric. """ import torch import torch.nn as nn import torch.nn.functional as F from torch.utils.data import Dataset, DataLoader from transformers import AutoTokenizer, AutoModelForCausalLM, BitsAndBytesConfig from peft import PeftModel, get_peft_model, LoraConfig from dataclasses import dataclass, field from pathlib import Path from typing import List, Dict, Tuple, Optional import json import random import re import os from tqdm import tqdm import logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) os.environ["TRANSFORMERS_VERBOSITY"] = "error" # ═══════════════════════════════════════════════════════════════════════════════ # CONFIGURATION # ═══════════════════════════════════════════════════════════════════════════════ @dataclass class DenseDataConfig: """Configuration for dense data generation.""" # Paths output_dir: str = "./dense_training_data" cache_dir: str = "./dense_cache" # Data generation num_prompts: int = 10000 num_contrastive_pairs: int = 5000 num_distillation_examples: int = 2000 # Density targets min_density_ratio: float = 1.5 # Dense should be 1.5x denser than verbose max_verbose_tokens: int = 300 max_dense_tokens: int = 150 target_density_score: float = 35.0 # Quality thresholds min_technical_terms: int = 3 max_filler_phrases: int = 1 min_claims_per_100_tokens: float = 4.0 @dataclass class DenseTrainConfig: """Configuration for dense training.""" # Stage 1: SFT on dense examples sft_epochs: int = 3 sft_lr: float = 2e-5 sft_batch_size: int = 1 # Stage 2: Contrastive/DPO training dpo_epochs: int = 2 dpo_lr: float = 5e-6 dpo_beta: float = 0.1 # Stage 3: RL refinement rl_steps: int = 5000 rl_lr: float = 1e-6 # General gradient_accumulation: int = 4 max_grad_norm: float = 1.0 checkpoint_every: int = 100 # ═══════════════════════════════════════════════════════════════════════════════ # TECHNICAL VOCABULARY & PATTERNS # ═══════════════════════════════════════════════════════════════════════════════ TECHNICAL_VOCABULARY = { # Computer Science "algorithm", "complexity", "O(n)", "O(log n)", "O(n²)", "recursive", "iterative", "hash", "tree", "graph", "stack", "queue", "heap", "array", "linked", "pointer", "memory", "allocation", "garbage", "collection", "thread", "mutex", "deadlock", "race", "condition", "semaphore", "atomic", "volatile", # Machine Learning "gradient", "backpropagation", "forward", "loss", "optimizer", "SGD", "Adam", "learning rate", "batch", "epoch", "overfit", "underfit", "regularization", "dropout", "normalization", "attention", "transformer", "embedding", "token", "encoder", "decoder", "autoregressive", "masked", "causal", "self-attention", "cross-attention", "multi-head", "feedforward", "residual", "layer norm", "softmax", "sigmoid", "ReLU", "GELU", "tanh", "activation", "convolution", "pooling", "stride", "kernel", "filter", "feature map", "recurrent", "LSTM", "GRU", "hidden state", "cell state", "gate", # Mathematics "derivative", "integral", "gradient", "Jacobian", "Hessian", "eigenvalue", "eigenvector", "matrix", "vector", "tensor", "scalar", "dot product", "cross product", "norm", "orthogonal", "basis", "span", "rank", "determinant", "inverse", "transpose", "symmetric", "positive definite", "probability", "distribution", "expectation", "variance", "covariance", "Gaussian", "Bernoulli", "categorical", "multinomial", "Poisson", "Bayes", "prior", "posterior", "likelihood", "marginal", "conditional", # Physics "quantum", "superposition", "entanglement", "measurement", "collapse", "wave function", "Schrödinger", "Hamiltonian", "eigenstate", "observable", "photon", "electron", "proton", "neutron", "quark", "lepton", "boson", "fermion", "spin", "momentum", "energy", "mass", "charge", "entropy", "thermodynamic", "equilibrium", "reversible", "irreversible", # Philosophy/Cognitive Science "consciousness", "qualia", "phenomenal", "subjective", "intentionality", "representation", "computation", "functionalism", "dualism", "physicalism", "emergence", "supervenience", "reduction", "explanation", "mechanism", } FILLER_PHRASES = [ "that's a great question", "that's an interesting question", "great question", "interesting question", "let me explain", "let me think about", "i'd be happy to", "i'll do my best", "it's important to note", "it's worth mentioning", "it should be noted", "as you may know", "as i mentioned", "in other words", "basically", "essentially", "actually", "literally", "obviously", "clearly", "of course", "needless to say", "to be honest", "in my opinion", "i think", "i believe", "i would say", "it seems like", "kind of", "sort of", "you know", "i mean", ] DENSE_PATTERNS = { "definition": r"^[A-Z][a-z]+: [a-z]", # "Recursion: function..." "enumeration": r"\(\d+\)|[①②③④⑤]", # "(1)" or "①" "mathematical": r"[∑∏∫∂∇≈≠≤≥∈∀∃→←↔×÷±√∞]|O\([^)]+\)", "technical_colon": r"\w+: \w+", # "Key: value" format "abbreviation": r"\b[A-Z]{2,}\b", # "LSTM", "GRU", etc. "formula": r"\w+\s*[=<>≈]\s*\w+", # "x = y" } # ═══════════════════════════════════════════════════════════════════════════════ # DENSITY METRICS (IMPROVED) # ═══════════════════════════════════════════════════════════════════════════════ class DensityAnalyzer: """Comprehensive density analysis with multiple metrics.""" def __init__(self): self.technical_vocab = {w.lower() for w in TECHNICAL_VOCABULARY} self.filler_phrases = [p.lower() for p in FILLER_PHRASES] def analyze(self, text: str) -> Dict[str, float]: """Full density analysis of text.""" text_lower = text.lower() words = text.split() tokens = len(words) # Approximate if tokens < 5: return {"total_score": 0, "tokens": tokens} # 1. Concept density (unique content words / tokens) content_words = set(w.lower() for w in words if len(w) > 4 and w.isalpha()) concept_density = len(content_words) / tokens # 2. Technical term density tech_words = set(w.lower() for w in words if w.lower() in self.technical_vocab) tech_density = len(tech_words) / tokens tech_count = len(tech_words) # 3. Filler phrase penalty filler_count = sum(1 for p in self.filler_phrases if p in text_lower) filler_penalty = min(filler_count * 0.15, 0.6) # 4. Dense pattern bonus pattern_score = 0 for name, pattern in DENSE_PATTERNS.items(): matches = len(re.findall(pattern, text)) pattern_score += min(matches * 0.05, 0.2) # 5. Information structure (sentences with claims) sentences = re.split(r'[.!?]', text) claim_patterns = [" is ", " are ", " means ", " equals ", " requires ", " causes ", " produces ", " defined as", " consists of"] claims = sum(1 for s in sentences if any(p in s.lower() for p in claim_patterns)) claim_density = claims / max(len(sentences), 1) # 6. Compression ratio estimate (info per token) unique_bigrams = set() for i in range(len(words) - 1): unique_bigrams.add((words[i].lower(), words[i+1].lower())) bigram_diversity = len(unique_bigrams) / max(tokens - 1, 1) # 7. Code/math content code_blocks = len(re.findall(r'```[\s\S]*?```', text)) inline_code = len(re.findall(r'`[^`]+`', text)) math_symbols = len(re.findall(r'[∑∏∫∂∇≈≠≤≥∈∀∃→←↔×÷±√∞]', text)) structured_score = (code_blocks * 0.1 + inline_code * 0.02 + math_symbols * 0.03) # Combined score (0-100 scale) total_score = ( concept_density * 25 + # Max ~25 points tech_density * 30 + # Max ~30 points claim_density * 15 + # Max ~15 points bigram_diversity * 10 + # Max ~10 points pattern_score * 10 + # Max ~10 points structured_score * 10 - # Max ~10 points filler_penalty * 20 # Penalty up to -12 points ) return { "total_score": max(0, total_score), "concept_density": concept_density, "tech_density": tech_density, "tech_count": tech_count, "claim_density": claim_density, "filler_count": filler_count, "pattern_score": pattern_score, "tokens": tokens, } def compare(self, verbose: str, dense: str) -> Dict[str, float]: """Compare verbose and dense versions.""" v_analysis = self.analyze(verbose) d_analysis = self.analyze(dense) return { "verbose_score": v_analysis["total_score"], "dense_score": d_analysis["total_score"], "density_ratio": d_analysis["total_score"] / max(v_analysis["total_score"], 0.1), "token_reduction": 1 - (d_analysis["tokens"] / max(v_analysis["tokens"], 1)), "efficiency_gain": (d_analysis["total_score"] / d_analysis["tokens"]) / max(v_analysis["total_score"] / v_analysis["tokens"], 0.01), } # ═══════════════════════════════════════════════════════════════════════════════ # STAGE 1: CONTRASTIVE DATA GENERATION # ═══════════════════════════════════════════════════════════════════════════════ class ContrastiveDataGenerator: """ Generate (prompt, verbose, dense) triplets through self-play. Strategy: 1. Generate verbose response (model's natural output) 2. Generate dense response via: a. Token budget constraint b. Self-critique and compression c. Technical vocabulary injection 3. Validate density improvement """ def __init__(self, model, tokenizer, analyzer: DensityAnalyzer): self.model = model self.tokenizer = tokenizer self.analyzer = analyzer def generate_verbose(self, prompt: str, max_tokens: int = 300) -> str: """Generate natural verbose response.""" formatted = f"<|im_start|>user\n{prompt}<|im_end|>\n<|im_start|>assistant\n" inputs = self.tokenizer(formatted, return_tensors="pt").to(self.model.device) with torch.no_grad(): outputs = self.model.generate( **inputs, max_new_tokens=max_tokens, do_sample=True, temperature=0.8, top_p=0.9, pad_token_id=self.tokenizer.eos_token_id ) return self.tokenizer.decode(outputs[0][inputs.input_ids.shape[1]:], skip_special_tokens=True) def generate_dense_constrained(self, prompt: str, max_tokens: int = 100) -> str: """Generate with strict token budget.""" dense_prompt = f"""<|im_start|>system You are an expert at maximally dense, information-rich responses. Rules: - No filler phrases ("Let me explain", "That's a great question") - No hedging ("I think", "probably", "might") - Use technical vocabulary precisely - Every word must carry information - Prefer "X: definition" format - Use abbreviations and symbols where clear - Maximum {max_tokens} tokens <|im_end|> <|im_start|>user {prompt} Respond with MAXIMUM information density.<|im_end|> <|im_start|>assistant """ inputs = self.tokenizer(dense_prompt, return_tensors="pt").to(self.model.device) with torch.no_grad(): outputs = self.model.generate( **inputs, max_new_tokens=max_tokens, do_sample=True, temperature=0.6, # Lower for more focused output top_p=0.85, pad_token_id=self.tokenizer.eos_token_id ) return self.tokenizer.decode(outputs[0][inputs.input_ids.shape[1]:], skip_special_tokens=True) def compress_response(self, verbose: str, prompt: str) -> str: """Use model to compress verbose response.""" compress_prompt = f"""<|im_start|>system You are a compression expert. Take the verbose response and compress it to MAXIMUM density. Remove ALL filler. Keep ALL technical content. Use symbols and abbreviations. Output should be 30-50% the length with 100% of the information. <|im_end|> <|im_start|>user Original question: {prompt} Verbose response to compress: {verbose} Compress to maximum density:<|im_end|> <|im_start|>assistant """ inputs = self.tokenizer(compress_prompt, return_tensors="pt").to(self.model.device) with torch.no_grad(): outputs = self.model.generate( **inputs, max_new_tokens=150, do_sample=True, temperature=0.5, pad_token_id=self.tokenizer.eos_token_id ) return self.tokenizer.decode(outputs[0][inputs.input_ids.shape[1]:], skip_special_tokens=True) def generate_triplet(self, prompt: str, config: DenseDataConfig) -> Optional[Dict]: """Generate a validated (prompt, verbose, dense) triplet.""" # Generate verbose verbose = self.generate_verbose(prompt, config.max_verbose_tokens) v_analysis = self.analyzer.analyze(verbose) # Try multiple dense generation strategies dense_candidates = [] # Strategy 1: Constrained generation dense1 = self.generate_dense_constrained(prompt, config.max_dense_tokens) dense_candidates.append(dense1) # Strategy 2: Compression dense2 = self.compress_response(verbose, prompt) dense_candidates.append(dense2) # Strategy 3: Even more constrained dense3 = self.generate_dense_constrained(prompt, config.max_dense_tokens // 2) dense_candidates.append(dense3) # Pick best dense candidate best_dense = None best_ratio = 0 for dense in dense_candidates: d_analysis = self.analyzer.analyze(dense) if d_analysis["tokens"] < 10: continue ratio = d_analysis["total_score"] / max(v_analysis["total_score"], 0.1) token_ratio = d_analysis["tokens"] / max(v_analysis["tokens"], 1) # Want higher density AND fewer tokens efficiency = ratio / max(token_ratio, 0.1) if efficiency > best_ratio and ratio >= config.min_density_ratio: best_ratio = efficiency best_dense = dense if best_dense is None: return None d_analysis = self.analyzer.analyze(best_dense) return { "prompt": prompt, "verbose": verbose, "dense": best_dense, "verbose_score": v_analysis["total_score"], "dense_score": d_analysis["total_score"], "verbose_tokens": v_analysis["tokens"], "dense_tokens": d_analysis["tokens"], "density_ratio": d_analysis["total_score"] / max(v_analysis["total_score"], 0.1), "token_reduction": 1 - (d_analysis["tokens"] / max(v_analysis["tokens"], 1)), } # ═══════════════════════════════════════════════════════════════════════════════ # STAGE 2: GOLD STANDARD DENSE EXAMPLES (Templates) # ═══════════════════════════════════════════════════════════════════════════════ GOLD_DENSE_EXAMPLES = [ { "prompt": "What is recursion?", "verbose": """That's a great question! Recursion is a fascinating programming concept that I'd be happy to explain. Recursion is when a function calls itself to solve a problem. It's a powerful technique that can be used to break down complex problems into simpler subproblems. Let me give you an example. When you calculate factorial, you can use recursion because factorial(n) = n * factorial(n-1). The key things to understand about recursion are: first, you need a base case that stops the recursion, and second, you need a recursive case that breaks down the problem. Without a base case, you'd have infinite recursion which would crash your program. I hope this helps explain recursion to you!""", "dense": """Recursion: function self-invocation with reduced subproblem. Components: (1) base case—termination condition returning without recursion, (2) recursive case—self-call progressing toward base. Example: factorial(n) = n × factorial(n-1), base: factorial(0)=1. Stack frames accumulate until base, then unwind. Tail recursion optimizes to iteration. Time complexity often O(2^n) without memoization; dynamic programming converts to O(n) via cached subproblems.""" }, { "prompt": "How does attention work in transformers?", "verbose": """Great question! I'd be happy to explain how attention works in transformers. Attention is really the key innovation that makes transformers so powerful. The basic idea is that attention allows the model to focus on different parts of the input when producing each part of the output. Let me break this down for you. In transformers, we have something called self-attention, where each position in a sequence attends to all other positions. The way it works is that we compute three vectors for each position: a query, a key, and a value. Then we compute attention scores by taking the dot product of queries and keys, scale them, apply softmax, and use these weights to combine the values. This is often called scaled dot-product attention. Multi-head attention runs this process multiple times in parallel with different learned projections, which allows the model to attend to information from different representation subspaces. I hope this explanation helps!""", "dense": """Attention: relevance-weighted information aggregation. Mechanism: Q·Kᵀ/√d_k → softmax → weighted V sum. Q,K,V = learned linear projections of input. Scaling by √d_k prevents softmax saturation. Self-attention: Q,K,V from same sequence (each position attends to all). Cross-attention: Q from decoder, K,V from encoder. Multi-head: h parallel attention functions with projections W_Q,W_K,W_V ∈ ℝ^{d×d_k}, outputs concatenated and projected. Complexity O(n²d)—quadratic in sequence length. Enables global context aggregation without recurrence.""" }, { "prompt": "What is consciousness?", "verbose": """That's a really deep and fascinating question! Consciousness is one of the most profound mysteries in philosophy and science. I should note that as an AI, I don't have personal experience of consciousness, but I can share what researchers and philosophers think about it. Consciousness generally refers to the subjective experience of being aware - the "what it's like" to be something. There are many different theories about consciousness. Some scientists think it emerges from complex information processing in the brain. Philosophers like David Chalmers have pointed out the "hard problem" of consciousness - why does physical processing give rise to subjective experience at all? There are also theories like Global Workspace Theory, Integrated Information Theory, and Higher-Order theories. This remains one of the deepest unsolved questions in philosophy of mind. I hope this gives you a good overview!""", "dense": """Consciousness: subjective phenomenal experience—"what it's like" to be X. Hard problem (Chalmers): why physical processes → qualia? Major theories: (1) Global Workspace (Baars)—consciousness = information broadcast to multiple brain systems; (2) Integrated Information Theory (Tononi)—consciousness = integrated information (Φ); (3) Higher-Order (Rosenthal)—requires meta-representation of mental states. Neural correlates identified (prefrontal, parietal) but mechanism-experience gap persists. Possibly irreducible to functional explanation.""" }, { "prompt": "Explain gradient descent", "verbose": """I'd be happy to explain gradient descent! It's a fundamental optimization algorithm used extensively in machine learning. The basic idea is that we want to find the minimum of a function, typically a loss function that measures how wrong our model's predictions are. Gradient descent works by iteratively moving in the direction of steepest descent, which is the negative of the gradient. Think of it like being on a hill and always taking a step in the direction that goes most steeply downward. The size of each step is controlled by the learning rate. If the learning rate is too large, you might overshoot the minimum. If it's too small, training will be very slow. There are many variants like stochastic gradient descent which uses random samples, and Adam which adapts the learning rate. The gradient tells us the direction and magnitude of the steepest increase, so we move in the opposite direction to decrease the loss.""", "dense": """Gradient descent: iterative first-order optimization. Update rule: θ ← θ - α∇L(θ). α = learning rate, ∇L = gradient of loss w.r.t. parameters. Variants: (1) Batch—full dataset gradient, stable but slow; (2) SGD—single sample, noisy but fast; (3) Mini-batch—compromise, typical 32-256. Momentum: v ← βv + ∇L, θ ← θ - αv (escapes local minima). Adam: adaptive per-parameter rates via first/second moment estimates. Convergence: convex → global minimum; non-convex → local minimum or saddle. Learning rate critical: too high → divergence, too low → slow/stuck.""" }, { "prompt": "What is entropy in information theory?", "verbose": """Great question! Entropy is a really important concept in information theory. It was introduced by Claude Shannon in 1948. The basic idea is that entropy measures the average amount of information or uncertainty in a random variable. If something is very predictable, it has low entropy. If it's very unpredictable, it has high entropy. For example, a fair coin has maximum entropy for a binary variable because the outcome is completely uncertain. The formula involves summing up the probability of each outcome times the log of that probability. Entropy is measured in bits when using log base 2. This concept is fundamental to data compression - you can't compress data below its entropy on average. It's also used in machine learning for things like cross-entropy loss. I hope this helps explain entropy!""", "dense": """Entropy (Shannon): expected information content. H(X) = -Σ p(x)log₂p(x) bits. Measures uncertainty/surprise. Properties: H ≥ 0; H = 0 iff deterministic; maximum H = log₂|X| at uniform distribution. Binary entropy: H(p) = -p·log₂p - (1-p)·log₂(1-p), max at p=0.5. Fundamental limit: data cannot be compressed below H bits/symbol (source coding theorem). Cross-entropy H(p,q) = -Σp(x)log q(x) ≥ H(p), with equality iff p=q. KL divergence: D_KL(p||q) = H(p,q) - H(p). Used in ML loss functions, decision trees (information gain), cryptography.""" }, ] def create_gold_standard_dataset(output_path: str): """Save gold standard examples for SFT.""" # Expand with more examples programmatically expanded_examples = [] for ex in GOLD_DENSE_EXAMPLES: expanded_examples.append({ "prompt": ex["prompt"], "response": ex["dense"], # Train on dense version "type": "gold_dense" }) # Also create preference pair expanded_examples.append({ "prompt": ex["prompt"], "chosen": ex["dense"], "rejected": ex["verbose"], "type": "preference_pair" }) # Add more technical prompts with template dense responses technical_prompts = [ ("What is backpropagation?", "Backpropagation: reverse-mode automatic differentiation for neural networks. Computes ∂L/∂w for all weights via chain rule. Forward pass: compute activations layer by layer. Backward pass: propagate error gradients from output to input. For layer l: δˡ = (Wˡ⁺¹)ᵀδˡ⁺¹ ⊙ σ'(zˡ). Weight gradient: ∂L/∂Wˡ = δˡ(aˡ⁻¹)ᵀ. Complexity O(n) per sample—same as forward pass. Enables training deep networks via gradient descent."), ("Explain hash tables", "Hash table: O(1) average-case key-value store. Mechanism: hash(key) → index into array. Collision resolution: (1) chaining—linked list at each bucket; (2) open addressing—probe sequence (linear, quadratic, double hashing). Load factor α = n/m; rehash when α > 0.75. Average case: O(1) search/insert/delete. Worst case: O(n) with pathological hash. Good hash: uniform distribution, deterministic, fast. Used in: sets, caches, symbol tables, databases."), ("What is P vs NP?", "P vs NP: fundamental open problem in computational complexity. P = problems solvable in polynomial time. NP = problems verifiable in polynomial time. P ⊆ NP trivially. Question: P = NP? NP-complete: hardest NP problems; if any in P, then P=NP. Examples: SAT, traveling salesman, graph coloring. Cook-Levin: SAT is NP-complete. Implications if P=NP: cryptography breaks, optimization trivializes. Consensus: P ≠ NP but unproven. Millennium Prize problem ($1M)."), ("How does LSTM work?", "LSTM: gated recurrent architecture solving vanishing gradient. Gates (σ = sigmoid): forget fₜ = σ(Wf·[hₜ₋₁,xₜ]), input iₜ = σ(Wi·[hₜ₋₁,xₜ]), output oₜ = σ(Wo·[hₜ₋₁,xₜ]). Cell state: cₜ = fₜ⊙cₜ₋₁ + iₜ⊙tanh(Wc·[hₜ₋₁,xₜ]). Hidden: hₜ = oₜ⊙tanh(cₜ). Key: cell state provides gradient highway—additive updates, no vanishing. Forget gate learns what to discard; input gate what to store. Bidirectional: forward + backward passes. Superseded by Transformers for most tasks but still used in sequence labeling."), ("What is Bayes' theorem?", "Bayes' theorem: P(A|B) = P(B|A)·P(A)/P(B). Posterior ∝ likelihood × prior. Components: P(A|B) = posterior (belief after evidence), P(B|A) = likelihood (evidence given hypothesis), P(A) = prior (initial belief), P(B) = marginal (normalizing constant). Inference: update beliefs with evidence. Applications: spam filtering, medical diagnosis, A/B testing, ML (Bayesian neural nets, Gaussian processes). Conjugate priors enable closed-form updates. MCMC for intractable posteriors."), ] for prompt, dense in technical_prompts: expanded_examples.append({ "prompt": prompt, "response": dense, "type": "gold_dense" }) with open(output_path, 'w') as f: json.dump(expanded_examples, f, indent=2) logger.info(f"Created {len(expanded_examples)} gold standard examples at {output_path}") return expanded_examples # ═══════════════════════════════════════════════════════════════════════════════ # STAGE 2: SUPERVISED FINE-TUNING ON DENSE EXAMPLES # ═══════════════════════════════════════════════════════════════════════════════ class DenseExampleDataset(Dataset): """Dataset for SFT on dense examples.""" def __init__(self, examples: List[Dict], tokenizer, max_length: int = 512): self.examples = [e for e in examples if e.get("type") == "gold_dense"] self.tokenizer = tokenizer self.max_length = max_length def __len__(self): return len(self.examples) def __getitem__(self, idx): ex = self.examples[idx] text = f"<|im_start|>user\n{ex['prompt']}<|im_end|>\n<|im_start|>assistant\n{ex['response']}<|im_end|>" encoded = self.tokenizer( text, truncation=True, max_length=self.max_length, padding="max_length", return_tensors="pt" ) return { "input_ids": encoded["input_ids"].squeeze(), "attention_mask": encoded["attention_mask"].squeeze(), "labels": encoded["input_ids"].squeeze() # For causal LM } def sft_on_dense_examples(model, tokenizer, examples: List[Dict], config: DenseTrainConfig): """Supervised fine-tuning on gold-standard dense examples.""" dataset = DenseExampleDataset(examples, tokenizer) dataloader = DataLoader(dataset, batch_size=1, shuffle=True) optimizer = torch.optim.AdamW(model.parameters(), lr=config.sft_lr) model.train() for epoch in range(config.sft_epochs): total_loss = 0 for batch_idx, batch in enumerate(tqdm(dataloader, desc=f"SFT Epoch {epoch+1}")): input_ids = batch["input_ids"].to(model.device) attention_mask = batch["attention_mask"].to(model.device) labels = batch["labels"].to(model.device) outputs = model( input_ids=input_ids, attention_mask=attention_mask, labels=labels ) loss = outputs.loss / config.gradient_accumulation loss.backward() if (batch_idx + 1) % config.gradient_accumulation == 0: torch.nn.utils.clip_grad_norm_(model.parameters(), config.max_grad_norm) optimizer.step() optimizer.zero_grad() total_loss += loss.item() * config.gradient_accumulation avg_loss = total_loss / len(dataloader) logger.info(f"SFT Epoch {epoch+1} | Loss: {avg_loss:.4f}") return model # ═══════════════════════════════════════════════════════════════════════════════ # STAGE 3: DIRECT PREFERENCE OPTIMIZATION (DPO) # ═══════════════════════════════════════════════════════════════════════════════ class PreferencePairDataset(Dataset): """Dataset for DPO training on (prompt, chosen, rejected) triplets.""" def __init__(self, examples: List[Dict], tokenizer, max_length: int = 512): self.examples = [e for e in examples if e.get("type") == "preference_pair"] self.tokenizer = tokenizer self.max_length = max_length def __len__(self): return len(self.examples) def __getitem__(self, idx): ex = self.examples[idx] prompt = f"<|im_start|>user\n{ex['prompt']}<|im_end|>\n<|im_start|>assistant\n" chosen_text = prompt + ex['chosen'] + "<|im_end|>" rejected_text = prompt + ex['rejected'] + "<|im_end|>" chosen_enc = self.tokenizer(chosen_text, truncation=True, max_length=self.max_length, padding="max_length", return_tensors="pt") rejected_enc = self.tokenizer(rejected_text, truncation=True, max_length=self.max_length, padding="max_length", return_tensors="pt") return { "chosen_input_ids": chosen_enc["input_ids"].squeeze(), "chosen_attention_mask": chosen_enc["attention_mask"].squeeze(), "rejected_input_ids": rejected_enc["input_ids"].squeeze(), "rejected_attention_mask": rejected_enc["attention_mask"].squeeze(), } def dpo_loss(model, ref_model, batch, beta: float = 0.1): """ Compute DPO loss. L_DPO = -log σ(β(log π(y_w|x) - log π(y_l|x) - log π_ref(y_w|x) + log π_ref(y_l|x))) """ # Get log probs from policy model chosen_logits = model( input_ids=batch["chosen_input_ids"], attention_mask=batch["chosen_attention_mask"] ).logits rejected_logits = model( input_ids=batch["rejected_input_ids"], attention_mask=batch["rejected_attention_mask"] ).logits # Get log probs from reference model with torch.no_grad(): ref_chosen_logits = ref_model( input_ids=batch["chosen_input_ids"], attention_mask=batch["chosen_attention_mask"] ).logits ref_rejected_logits = ref_model( input_ids=batch["rejected_input_ids"], attention_mask=batch["rejected_attention_mask"] ).logits # Compute log probabilities def get_log_probs(logits, input_ids, mask): log_probs = F.log_softmax(logits[:, :-1, :], dim=-1) selected = log_probs.gather(2, input_ids[:, 1:].unsqueeze(-1)).squeeze(-1) return (selected * mask[:, 1:]).sum(dim=1) / mask[:, 1:].sum(dim=1) pi_chosen = get_log_probs(chosen_logits, batch["chosen_input_ids"], batch["chosen_attention_mask"]) pi_rejected = get_log_probs(rejected_logits, batch["rejected_input_ids"], batch["rejected_attention_mask"]) ref_chosen = get_log_probs(ref_chosen_logits, batch["chosen_input_ids"], batch["chosen_attention_mask"]) ref_rejected = get_log_probs(ref_rejected_logits, batch["rejected_input_ids"], batch["rejected_attention_mask"]) # DPO loss logits_diff = beta * ((pi_chosen - ref_chosen) - (pi_rejected - ref_rejected)) loss = -F.logsigmoid(logits_diff).mean() return loss def dpo_train(model, ref_model, tokenizer, examples: List[Dict], config: DenseTrainConfig): """Direct Preference Optimization training.""" dataset = PreferencePairDataset(examples, tokenizer) dataloader = DataLoader(dataset, batch_size=2, shuffle=True) # Smaller batch for memory optimizer = torch.optim.AdamW(model.parameters(), lr=config.dpo_lr) model.train() ref_model.eval() for epoch in range(config.dpo_epochs): total_loss = 0 for batch_idx, batch in enumerate(tqdm(dataloader, desc=f"DPO Epoch {epoch+1}")): batch = {k: v.to(model.device) for k, v in batch.items()} loss = dpo_loss(model, ref_model, batch, beta=config.dpo_beta) loss = loss / config.gradient_accumulation loss.backward() if (batch_idx + 1) % config.gradient_accumulation == 0: torch.nn.utils.clip_grad_norm_(model.parameters(), config.max_grad_norm) optimizer.step() optimizer.zero_grad() total_loss += loss.item() * config.gradient_accumulation avg_loss = total_loss / len(dataloader) logger.info(f"DPO Epoch {epoch+1} | Loss: {avg_loss:.4f}") return model # ═══════════════════════════════════════════════════════════════════════════════ # STAGE 4: REINFORCEMENT LEARNING WITH CALIBRATED REWARD # ═══════════════════════════════════════════════════════════════════════════════ class DensityRewardModel: """ Calibrated reward model for density. Unlike the original simple reward, this model: 1. Uses the full density analyzer 2. Scales rewards to meaningful gradient range 3. Includes baseline subtraction for variance reduction """ def __init__(self, analyzer: DensityAnalyzer, baseline_ema: float = 0.99): self.analyzer = analyzer self.baseline = 0.0 self.baseline_ema = baseline_ema def compute_reward(self, response: str, prompt_complexity: float = 1.0) -> float: """Compute calibrated reward for a response.""" analysis = self.analyzer.analyze(response) # Base score from analyzer (0-50 typical range) density_score = analysis["total_score"] # Normalize to 0-1 range with target at 0.5 normalized = density_score / 70.0 # 35 → 0.5, 70 → 1.0 normalized = max(0, min(1, normalized)) # Bonus for meeting quality thresholds bonus = 0 if analysis["tech_count"] >= 3: bonus += 0.1 if analysis["filler_count"] == 0: bonus += 0.1 if analysis["claim_density"] > 0.3: bonus += 0.1 # Token efficiency bonus (prefer shorter) tokens = analysis["tokens"] if tokens < 80: bonus += 0.1 elif tokens > 200: bonus -= 0.1 raw_reward = normalized + bonus # Scale to create meaningful gradients (0.2 - 0.8 range) scaled_reward = 0.2 + raw_reward * 0.6 # Baseline subtraction for variance reduction advantage = scaled_reward - self.baseline # Update baseline with EMA self.baseline = self.baseline_ema * self.baseline + (1 - self.baseline_ema) * scaled_reward return scaled_reward, advantage, analysis def rl_dense_train(model, tokenizer, reward_model: DensityRewardModel, prompts: List[str], config: DenseTrainConfig): """ RL fine-tuning with calibrated density reward. Key improvements over original: 1. Calibrated rewards in 0.2-0.8 range (not 0.05-0.1) 2. Baseline subtraction for stable gradients 3. Entropy bonus to prevent collapse """ optimizer = torch.optim.AdamW(model.parameters(), lr=config.rl_lr) model.train() for step in range(config.rl_steps): prompt = random.choice(prompts) formatted = f"<|im_start|>user\n{prompt}<|im_end|>\n<|im_start|>assistant\n" inputs = tokenizer(formatted, return_tensors="pt").to(model.device) # Generate model.eval() with torch.no_grad(): outputs = model.generate( **inputs, max_new_tokens=150, do_sample=True, temperature=0.7, pad_token_id=tokenizer.eos_token_id, return_dict_in_generate=True, output_scores=True ) response = tokenizer.decode(outputs.sequences[0][inputs.input_ids.shape[1]:], skip_special_tokens=True) # Compute reward reward, advantage, analysis = reward_model.compute_reward(response) # Policy gradient model.train() logits = model(outputs.sequences, return_dict=True).logits shift_logits = logits[:, :-1, :].contiguous() shift_labels = outputs.sequences[:, 1:].contiguous() log_probs = F.log_softmax(shift_logits.float(), dim=-1) selected_log_probs = log_probs.gather(2, shift_labels.unsqueeze(-1)).squeeze(-1) mask = (shift_labels != tokenizer.pad_token_id).float() seq_log_prob = (selected_log_probs * mask).sum(dim=1) / mask.sum(dim=1).clamp(min=1) # Entropy bonus for exploration probs = F.softmax(shift_logits, dim=-1) entropy = -(probs * log_probs).sum(dim=-1).mean() entropy_bonus = 0.01 * entropy # Loss with advantage (not raw reward) loss = -(seq_log_prob * advantage).mean() - entropy_bonus loss.backward() if (step + 1) % config.gradient_accumulation == 0: torch.nn.utils.clip_grad_norm_(model.parameters(), config.max_grad_norm) optimizer.step() optimizer.zero_grad() # Logging if step % 25 == 0: logger.info(f"Step {step:5d} | Reward: {reward:.3f} | Adv: {advantage:.3f} | " f"Density: {analysis['total_score']:.1f} | Tokens: {analysis['tokens']}") # Checkpoint if step % config.checkpoint_every == 0 and step > 0: save_path = Path(f"./dense_checkpoints_v2/step_{step}") save_path.mkdir(parents=True, exist_ok=True) model.save_pretrained(save_path) logger.info(f"Saved checkpoint at step {step}") return model # ═══════════════════════════════════════════════════════════════════════════════ # MASTER PIPELINE # ═══════════════════════════════════════════════════════════════════════════════ class TheDensePipeline: """ THE CONDENSATOR - Ultimate Dense Training Pipeline Stages: 1. Generate contrastive data (verbose vs dense pairs) 2. SFT on gold-standard dense examples 3. DPO on preference pairs 4. RL refinement with calibrated rewards """ def __init__(self, model_path: str, device: str = "cuda"): self.device = torch.device(device) self.model_path = model_path self.analyzer = DensityAnalyzer() # Load model logger.info("Loading model...") self.tokenizer = AutoTokenizer.from_pretrained(model_path) self.tokenizer.pad_token = self.tokenizer.eos_token bnb_config = BitsAndBytesConfig( load_in_4bit=True, bnb_4bit_compute_dtype=torch.bfloat16, bnb_4bit_quant_type="nf4" ) self.model = AutoModelForCausalLM.from_pretrained( model_path, quantization_config=bnb_config, device_map="auto", torch_dtype=torch.bfloat16 ) # Add LoRA lora_config = LoraConfig( r=16, lora_alpha=32, target_modules=["q_proj", "k_proj", "v_proj", "o_proj", "gate_proj", "up_proj", "down_proj"], lora_dropout=0.05, bias="none", task_type="CAUSAL_LM" ) self.model = get_peft_model(self.model, lora_config) self.model.gradient_checkpointing_enable() logger.info("Model loaded with LoRA adapter") def run_full_pipeline(self, data_config: DenseDataConfig, train_config: DenseTrainConfig): """Execute the full 4-stage pipeline.""" Path(data_config.output_dir).mkdir(parents=True, exist_ok=True) # ═══════════════════════════════════════════════════════════════════ # STAGE 1: Create gold standard data # ═══════════════════════════════════════════════════════════════════ logger.info("=" * 60) logger.info("STAGE 1: Creating gold standard dense examples") logger.info("=" * 60) gold_path = Path(data_config.output_dir) / "gold_dense_examples.json" examples = create_gold_standard_dataset(str(gold_path)) # ═══════════════════════════════════════════════════════════════════ # STAGE 2: SFT on dense examples # ═══════════════════════════════════════════════════════════════════ logger.info("=" * 60) logger.info("STAGE 2: Supervised Fine-Tuning on dense examples") logger.info("=" * 60) self.model = sft_on_dense_examples( self.model, self.tokenizer, examples, train_config ) # Save SFT checkpoint sft_path = Path(data_config.output_dir) / "sft_checkpoint" self.model.save_pretrained(sft_path) logger.info(f"Saved SFT checkpoint to {sft_path}") # ═══════════════════════════════════════════════════════════════════ # STAGE 3: DPO training # ═══════════════════════════════════════════════════════════════════ logger.info("=" * 60) logger.info("STAGE 3: Direct Preference Optimization") logger.info("=" * 60) # Load reference model for DPO ref_model = AutoModelForCausalLM.from_pretrained( self.model_path, quantization_config=BitsAndBytesConfig( load_in_4bit=True, bnb_4bit_compute_dtype=torch.bfloat16, bnb_4bit_quant_type="nf4" ), device_map="auto", torch_dtype=torch.bfloat16 ) self.model = dpo_train( self.model, ref_model, self.tokenizer, examples, train_config ) # Clean up reference model del ref_model # Save DPO checkpoint dpo_path = Path(data_config.output_dir) / "dpo_checkpoint" self.model.save_pretrained(dpo_path) logger.info(f"Saved DPO checkpoint to {dpo_path}") # ═══════════════════════════════════════════════════════════════════ # STAGE 4: RL refinement # ═══════════════════════════════════════════════════════════════════ logger.info("=" * 60) logger.info("STAGE 4: RL Refinement with Calibrated Rewards") logger.info("=" * 60) reward_model = DensityRewardModel(self.analyzer) # Technical prompts for RL rl_prompts = [ "What is recursion?", "Explain gradient descent", "How does attention work?", "What is entropy?", "Explain backpropagation", "What is a hash table?", "Explain P vs NP", "How does LSTM work?", "What is Bayes' theorem?", "Explain neural networks", "What is consciousness?", "How does encryption work?", "Explain quantum computing", "What is machine learning?", "How does DNA replication work?", "Explain the transformer architecture", "What is reinforcement learning?", "How does the immune system work?", "Explain general relativity", "What is evolutionary computation?", ] self.model = rl_dense_train( self.model, self.tokenizer, reward_model, rl_prompts, train_config ) # Save final checkpoint final_path = Path(data_config.output_dir) / "final_dense_model" self.model.save_pretrained(final_path) logger.info(f"Saved final model to {final_path}") logger.info("=" * 60) logger.info("PIPELINE COMPLETE!") logger.info("=" * 60) return self.model def test_model(self, prompts: List[str] = None): """Test the trained model's density.""" if prompts is None: prompts = [ "What is recursion?", "Explain how attention works in transformers", "What is consciousness?", ] self.model.eval() print("\n" + "=" * 70) print("DENSITY TEST RESULTS") print("=" * 70) for prompt in prompts: formatted = f"<|im_start|>user\n{prompt}<|im_end|>\n<|im_start|>assistant\n" inputs = self.tokenizer(formatted, return_tensors="pt").to(self.device) with torch.no_grad(): outputs = self.model.generate( **inputs, max_new_tokens=150, do_sample=True, temperature=0.7, pad_token_id=self.tokenizer.eos_token_id ) response = self.tokenizer.decode( outputs[0][inputs.input_ids.shape[1]:], skip_special_tokens=True ) analysis = self.analyzer.analyze(response) print(f"\nPROMPT: {prompt}") print(f"DENSITY SCORE: {analysis['total_score']:.1f}") print(f"TOKENS: {analysis['tokens']}") print(f"TECH TERMS: {analysis['tech_count']}") print(f"FILLER: {analysis['filler_count']}") print(f"RESPONSE: {response[:300]}...") print("-" * 70) # ═══════════════════════════════════════════════════════════════════════════════ # MAIN ENTRY POINT # ═══════════════════════════════════════════════════════════════════════════════ def main(): import argparse parser = argparse.ArgumentParser(description="THE CONDENSATOR - Ultimate Dense Training") parser.add_argument("--model", type=str, required=True, help="Path to base model") parser.add_argument("--output", type=str, default="./dense_pipeline_output", help="Output directory") parser.add_argument("--sft-epochs", type=int, default=3, help="SFT epochs") parser.add_argument("--dpo-epochs", type=int, default=2, help="DPO epochs") parser.add_argument("--rl-steps", type=int, default=5000, help="RL refinement steps") parser.add_argument("--test-only", action="store_true", help="Only test existing model") args = parser.parse_args() data_config = DenseDataConfig(output_dir=args.output) train_config = DenseTrainConfig( sft_epochs=args.sft_epochs, dpo_epochs=args.dpo_epochs, rl_steps=args.rl_steps ) pipeline = TheDensePipeline(args.model) if args.test_only: pipeline.test_model() else: pipeline.run_full_pipeline(data_config, train_config) pipeline.test_model() if __name__ == "__main__": main()