LoganResearch's picture
Upload training_scripts/the_condensator.py with huggingface_hub
b60e9c8 verified
#!/usr/bin/env python3
"""
β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆ
β–ˆ β–ˆ
β–ˆ ARC DENSE TRAINING PIPELINE v2.0 - "THE CONDENSATOR" β–ˆ
β–ˆ β–ˆ
β–ˆ The most sophisticated information density training system ever created β–ˆ
β–ˆ β–ˆ
β–ˆ Core Innovation: We don't just reward density - we TEACH density β–ˆ
β–ˆ through contrastive examples, distillation, and iterative refinement β–ˆ
β–ˆ β–ˆ
β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆ
PHILOSOPHY:
-----------
The original dense training failed because it tried to optimize a metric
without showing the model WHAT dense output looks like.
This pipeline fixes that with a 4-stage approach:
STAGE 1: CONTRASTIVE DATA GENERATION
- Generate verbose responses (easy - model's default)
- Generate dense responses (using constrained decoding + self-critique)
- Create (prompt, verbose, dense) triplets
STAGE 2: DENSITY DISTILLATION
- Use Claude API / GPT-4 to generate gold-standard dense responses
- Fine-tune on these exemplars (SFT)
- Model learns WHAT density looks like
STAGE 3: CONTRASTIVE PREFERENCE TRAINING (DPO-style)
- Train model to prefer dense over verbose
- Direct signal: "this is better than that"
STAGE 4: REINFORCEMENT WITH LEARNED REWARD
- Train a reward model on density preferences
- RL fine-tune with strong, calibrated reward signal
The result: A model that UNDERSTANDS density, not just optimizes a metric.
"""
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
from transformers import AutoTokenizer, AutoModelForCausalLM, BitsAndBytesConfig
from peft import PeftModel, get_peft_model, LoraConfig
from dataclasses import dataclass, field
from pathlib import Path
from typing import List, Dict, Tuple, Optional
import json
import random
import re
import os
from tqdm import tqdm
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
os.environ["TRANSFORMERS_VERBOSITY"] = "error"
# ═══════════════════════════════════════════════════════════════════════════════
# CONFIGURATION
# ═══════════════════════════════════════════════════════════════════════════════
@dataclass
class DenseDataConfig:
"""Configuration for dense data generation."""
# Paths
output_dir: str = "./dense_training_data"
cache_dir: str = "./dense_cache"
# Data generation
num_prompts: int = 10000
num_contrastive_pairs: int = 5000
num_distillation_examples: int = 2000
# Density targets
min_density_ratio: float = 1.5 # Dense should be 1.5x denser than verbose
max_verbose_tokens: int = 300
max_dense_tokens: int = 150
target_density_score: float = 35.0
# Quality thresholds
min_technical_terms: int = 3
max_filler_phrases: int = 1
min_claims_per_100_tokens: float = 4.0
@dataclass
class DenseTrainConfig:
"""Configuration for dense training."""
# Stage 1: SFT on dense examples
sft_epochs: int = 3
sft_lr: float = 2e-5
sft_batch_size: int = 1
# Stage 2: Contrastive/DPO training
dpo_epochs: int = 2
dpo_lr: float = 5e-6
dpo_beta: float = 0.1
# Stage 3: RL refinement
rl_steps: int = 5000
rl_lr: float = 1e-6
# General
gradient_accumulation: int = 4
max_grad_norm: float = 1.0
checkpoint_every: int = 100
# ═══════════════════════════════════════════════════════════════════════════════
# TECHNICAL VOCABULARY & PATTERNS
# ═══════════════════════════════════════════════════════════════════════════════
TECHNICAL_VOCABULARY = {
# Computer Science
"algorithm", "complexity", "O(n)", "O(log n)", "O(nΒ²)", "recursive", "iterative",
"hash", "tree", "graph", "stack", "queue", "heap", "array", "linked",
"pointer", "memory", "allocation", "garbage", "collection", "thread", "mutex",
"deadlock", "race", "condition", "semaphore", "atomic", "volatile",
# Machine Learning
"gradient", "backpropagation", "forward", "loss", "optimizer", "SGD", "Adam",
"learning rate", "batch", "epoch", "overfit", "underfit", "regularization",
"dropout", "normalization", "attention", "transformer", "embedding", "token",
"encoder", "decoder", "autoregressive", "masked", "causal", "self-attention",
"cross-attention", "multi-head", "feedforward", "residual", "layer norm",
"softmax", "sigmoid", "ReLU", "GELU", "tanh", "activation",
"convolution", "pooling", "stride", "kernel", "filter", "feature map",
"recurrent", "LSTM", "GRU", "hidden state", "cell state", "gate",
# Mathematics
"derivative", "integral", "gradient", "Jacobian", "Hessian", "eigenvalue",
"eigenvector", "matrix", "vector", "tensor", "scalar", "dot product",
"cross product", "norm", "orthogonal", "basis", "span", "rank",
"determinant", "inverse", "transpose", "symmetric", "positive definite",
"probability", "distribution", "expectation", "variance", "covariance",
"Gaussian", "Bernoulli", "categorical", "multinomial", "Poisson",
"Bayes", "prior", "posterior", "likelihood", "marginal", "conditional",
# Physics
"quantum", "superposition", "entanglement", "measurement", "collapse",
"wave function", "SchrΓΆdinger", "Hamiltonian", "eigenstate", "observable",
"photon", "electron", "proton", "neutron", "quark", "lepton", "boson",
"fermion", "spin", "momentum", "energy", "mass", "charge",
"entropy", "thermodynamic", "equilibrium", "reversible", "irreversible",
# Philosophy/Cognitive Science
"consciousness", "qualia", "phenomenal", "subjective", "intentionality",
"representation", "computation", "functionalism", "dualism", "physicalism",
"emergence", "supervenience", "reduction", "explanation", "mechanism",
}
FILLER_PHRASES = [
"that's a great question",
"that's an interesting question",
"great question",
"interesting question",
"let me explain",
"let me think about",
"i'd be happy to",
"i'll do my best",
"it's important to note",
"it's worth mentioning",
"it should be noted",
"as you may know",
"as i mentioned",
"in other words",
"basically",
"essentially",
"actually",
"literally",
"obviously",
"clearly",
"of course",
"needless to say",
"to be honest",
"in my opinion",
"i think",
"i believe",
"i would say",
"it seems like",
"kind of",
"sort of",
"you know",
"i mean",
]
DENSE_PATTERNS = {
"definition": r"^[A-Z][a-z]+: [a-z]", # "Recursion: function..."
"enumeration": r"\(\d+\)|[β‘ β‘‘β‘’β‘£β‘€]", # "(1)" or "β‘ "
"mathematical": r"[βˆ‘βˆβˆ«βˆ‚βˆ‡β‰ˆβ‰ β‰€β‰₯βˆˆβˆ€βˆƒβ†’β†β†”Γ—Γ·Β±βˆšβˆž]|O\([^)]+\)",
"technical_colon": r"\w+: \w+", # "Key: value" format
"abbreviation": r"\b[A-Z]{2,}\b", # "LSTM", "GRU", etc.
"formula": r"\w+\s*[=<>β‰ˆ]\s*\w+", # "x = y"
}
# ═══════════════════════════════════════════════════════════════════════════════
# DENSITY METRICS (IMPROVED)
# ═══════════════════════════════════════════════════════════════════════════════
class DensityAnalyzer:
"""Comprehensive density analysis with multiple metrics."""
def __init__(self):
self.technical_vocab = {w.lower() for w in TECHNICAL_VOCABULARY}
self.filler_phrases = [p.lower() for p in FILLER_PHRASES]
def analyze(self, text: str) -> Dict[str, float]:
"""Full density analysis of text."""
text_lower = text.lower()
words = text.split()
tokens = len(words) # Approximate
if tokens < 5:
return {"total_score": 0, "tokens": tokens}
# 1. Concept density (unique content words / tokens)
content_words = set(w.lower() for w in words if len(w) > 4 and w.isalpha())
concept_density = len(content_words) / tokens
# 2. Technical term density
tech_words = set(w.lower() for w in words if w.lower() in self.technical_vocab)
tech_density = len(tech_words) / tokens
tech_count = len(tech_words)
# 3. Filler phrase penalty
filler_count = sum(1 for p in self.filler_phrases if p in text_lower)
filler_penalty = min(filler_count * 0.15, 0.6)
# 4. Dense pattern bonus
pattern_score = 0
for name, pattern in DENSE_PATTERNS.items():
matches = len(re.findall(pattern, text))
pattern_score += min(matches * 0.05, 0.2)
# 5. Information structure (sentences with claims)
sentences = re.split(r'[.!?]', text)
claim_patterns = [" is ", " are ", " means ", " equals ", " requires ",
" causes ", " produces ", " defined as", " consists of"]
claims = sum(1 for s in sentences if any(p in s.lower() for p in claim_patterns))
claim_density = claims / max(len(sentences), 1)
# 6. Compression ratio estimate (info per token)
unique_bigrams = set()
for i in range(len(words) - 1):
unique_bigrams.add((words[i].lower(), words[i+1].lower()))
bigram_diversity = len(unique_bigrams) / max(tokens - 1, 1)
# 7. Code/math content
code_blocks = len(re.findall(r'```[\s\S]*?```', text))
inline_code = len(re.findall(r'`[^`]+`', text))
math_symbols = len(re.findall(r'[βˆ‘βˆβˆ«βˆ‚βˆ‡β‰ˆβ‰ β‰€β‰₯βˆˆβˆ€βˆƒβ†’β†β†”Γ—Γ·Β±βˆšβˆž]', text))
structured_score = (code_blocks * 0.1 + inline_code * 0.02 + math_symbols * 0.03)
# Combined score (0-100 scale)
total_score = (
concept_density * 25 + # Max ~25 points
tech_density * 30 + # Max ~30 points
claim_density * 15 + # Max ~15 points
bigram_diversity * 10 + # Max ~10 points
pattern_score * 10 + # Max ~10 points
structured_score * 10 - # Max ~10 points
filler_penalty * 20 # Penalty up to -12 points
)
return {
"total_score": max(0, total_score),
"concept_density": concept_density,
"tech_density": tech_density,
"tech_count": tech_count,
"claim_density": claim_density,
"filler_count": filler_count,
"pattern_score": pattern_score,
"tokens": tokens,
}
def compare(self, verbose: str, dense: str) -> Dict[str, float]:
"""Compare verbose and dense versions."""
v_analysis = self.analyze(verbose)
d_analysis = self.analyze(dense)
return {
"verbose_score": v_analysis["total_score"],
"dense_score": d_analysis["total_score"],
"density_ratio": d_analysis["total_score"] / max(v_analysis["total_score"], 0.1),
"token_reduction": 1 - (d_analysis["tokens"] / max(v_analysis["tokens"], 1)),
"efficiency_gain": (d_analysis["total_score"] / d_analysis["tokens"]) /
max(v_analysis["total_score"] / v_analysis["tokens"], 0.01),
}
# ═══════════════════════════════════════════════════════════════════════════════
# STAGE 1: CONTRASTIVE DATA GENERATION
# ═══════════════════════════════════════════════════════════════════════════════
class ContrastiveDataGenerator:
"""
Generate (prompt, verbose, dense) triplets through self-play.
Strategy:
1. Generate verbose response (model's natural output)
2. Generate dense response via:
a. Token budget constraint
b. Self-critique and compression
c. Technical vocabulary injection
3. Validate density improvement
"""
def __init__(self, model, tokenizer, analyzer: DensityAnalyzer):
self.model = model
self.tokenizer = tokenizer
self.analyzer = analyzer
def generate_verbose(self, prompt: str, max_tokens: int = 300) -> str:
"""Generate natural verbose response."""
formatted = f"<|im_start|>user\n{prompt}<|im_end|>\n<|im_start|>assistant\n"
inputs = self.tokenizer(formatted, return_tensors="pt").to(self.model.device)
with torch.no_grad():
outputs = self.model.generate(
**inputs,
max_new_tokens=max_tokens,
do_sample=True,
temperature=0.8,
top_p=0.9,
pad_token_id=self.tokenizer.eos_token_id
)
return self.tokenizer.decode(outputs[0][inputs.input_ids.shape[1]:], skip_special_tokens=True)
def generate_dense_constrained(self, prompt: str, max_tokens: int = 100) -> str:
"""Generate with strict token budget."""
dense_prompt = f"""<|im_start|>system
You are an expert at maximally dense, information-rich responses.
Rules:
- No filler phrases ("Let me explain", "That's a great question")
- No hedging ("I think", "probably", "might")
- Use technical vocabulary precisely
- Every word must carry information
- Prefer "X: definition" format
- Use abbreviations and symbols where clear
- Maximum {max_tokens} tokens
<|im_end|>
<|im_start|>user
{prompt}
Respond with MAXIMUM information density.<|im_end|>
<|im_start|>assistant
"""
inputs = self.tokenizer(dense_prompt, return_tensors="pt").to(self.model.device)
with torch.no_grad():
outputs = self.model.generate(
**inputs,
max_new_tokens=max_tokens,
do_sample=True,
temperature=0.6, # Lower for more focused output
top_p=0.85,
pad_token_id=self.tokenizer.eos_token_id
)
return self.tokenizer.decode(outputs[0][inputs.input_ids.shape[1]:], skip_special_tokens=True)
def compress_response(self, verbose: str, prompt: str) -> str:
"""Use model to compress verbose response."""
compress_prompt = f"""<|im_start|>system
You are a compression expert. Take the verbose response and compress it to MAXIMUM density.
Remove ALL filler. Keep ALL technical content. Use symbols and abbreviations.
Output should be 30-50% the length with 100% of the information.
<|im_end|>
<|im_start|>user
Original question: {prompt}
Verbose response to compress:
{verbose}
Compress to maximum density:<|im_end|>
<|im_start|>assistant
"""
inputs = self.tokenizer(compress_prompt, return_tensors="pt").to(self.model.device)
with torch.no_grad():
outputs = self.model.generate(
**inputs,
max_new_tokens=150,
do_sample=True,
temperature=0.5,
pad_token_id=self.tokenizer.eos_token_id
)
return self.tokenizer.decode(outputs[0][inputs.input_ids.shape[1]:], skip_special_tokens=True)
def generate_triplet(self, prompt: str, config: DenseDataConfig) -> Optional[Dict]:
"""Generate a validated (prompt, verbose, dense) triplet."""
# Generate verbose
verbose = self.generate_verbose(prompt, config.max_verbose_tokens)
v_analysis = self.analyzer.analyze(verbose)
# Try multiple dense generation strategies
dense_candidates = []
# Strategy 1: Constrained generation
dense1 = self.generate_dense_constrained(prompt, config.max_dense_tokens)
dense_candidates.append(dense1)
# Strategy 2: Compression
dense2 = self.compress_response(verbose, prompt)
dense_candidates.append(dense2)
# Strategy 3: Even more constrained
dense3 = self.generate_dense_constrained(prompt, config.max_dense_tokens // 2)
dense_candidates.append(dense3)
# Pick best dense candidate
best_dense = None
best_ratio = 0
for dense in dense_candidates:
d_analysis = self.analyzer.analyze(dense)
if d_analysis["tokens"] < 10:
continue
ratio = d_analysis["total_score"] / max(v_analysis["total_score"], 0.1)
token_ratio = d_analysis["tokens"] / max(v_analysis["tokens"], 1)
# Want higher density AND fewer tokens
efficiency = ratio / max(token_ratio, 0.1)
if efficiency > best_ratio and ratio >= config.min_density_ratio:
best_ratio = efficiency
best_dense = dense
if best_dense is None:
return None
d_analysis = self.analyzer.analyze(best_dense)
return {
"prompt": prompt,
"verbose": verbose,
"dense": best_dense,
"verbose_score": v_analysis["total_score"],
"dense_score": d_analysis["total_score"],
"verbose_tokens": v_analysis["tokens"],
"dense_tokens": d_analysis["tokens"],
"density_ratio": d_analysis["total_score"] / max(v_analysis["total_score"], 0.1),
"token_reduction": 1 - (d_analysis["tokens"] / max(v_analysis["tokens"], 1)),
}
# ═══════════════════════════════════════════════════════════════════════════════
# STAGE 2: GOLD STANDARD DENSE EXAMPLES (Templates)
# ═══════════════════════════════════════════════════════════════════════════════
GOLD_DENSE_EXAMPLES = [
{
"prompt": "What is recursion?",
"verbose": """That's a great question! Recursion is a fascinating programming concept that I'd be happy to explain.
Recursion is when a function calls itself to solve a problem. It's a powerful technique that can be used to break down
complex problems into simpler subproblems. Let me give you an example. When you calculate factorial, you can use recursion
because factorial(n) = n * factorial(n-1). The key things to understand about recursion are: first, you need a base case
that stops the recursion, and second, you need a recursive case that breaks down the problem. Without a base case, you'd
have infinite recursion which would crash your program. I hope this helps explain recursion to you!""",
"dense": """Recursion: function self-invocation with reduced subproblem. Components: (1) base caseβ€”termination
condition returning without recursion, (2) recursive caseβ€”self-call progressing toward base. Example: factorial(n) =
n Γ— factorial(n-1), base: factorial(0)=1. Stack frames accumulate until base, then unwind. Tail recursion optimizes
to iteration. Time complexity often O(2^n) without memoization; dynamic programming converts to O(n) via cached subproblems."""
},
{
"prompt": "How does attention work in transformers?",
"verbose": """Great question! I'd be happy to explain how attention works in transformers. Attention is really
the key innovation that makes transformers so powerful. The basic idea is that attention allows the model to focus on
different parts of the input when producing each part of the output. Let me break this down for you. In transformers,
we have something called self-attention, where each position in a sequence attends to all other positions. The way it
works is that we compute three vectors for each position: a query, a key, and a value. Then we compute attention scores
by taking the dot product of queries and keys, scale them, apply softmax, and use these weights to combine the values.
This is often called scaled dot-product attention. Multi-head attention runs this process multiple times in parallel
with different learned projections, which allows the model to attend to information from different representation
subspaces. I hope this explanation helps!""",
"dense": """Attention: relevance-weighted information aggregation. Mechanism: QΒ·Kα΅€/√d_k β†’ softmax β†’ weighted V sum.
Q,K,V = learned linear projections of input. Scaling by √d_k prevents softmax saturation. Self-attention: Q,K,V from
same sequence (each position attends to all). Cross-attention: Q from decoder, K,V from encoder. Multi-head: h parallel
attention functions with projections W_Q,W_K,W_V ∈ ℝ^{dΓ—d_k}, outputs concatenated and projected. Complexity O(nΒ²d)β€”quadratic
in sequence length. Enables global context aggregation without recurrence."""
},
{
"prompt": "What is consciousness?",
"verbose": """That's a really deep and fascinating question! Consciousness is one of the most profound mysteries
in philosophy and science. I should note that as an AI, I don't have personal experience of consciousness, but I can
share what researchers and philosophers think about it. Consciousness generally refers to the subjective experience of
being aware - the "what it's like" to be something. There are many different theories about consciousness. Some scientists
think it emerges from complex information processing in the brain. Philosophers like David Chalmers have pointed out the
"hard problem" of consciousness - why does physical processing give rise to subjective experience at all? There are also
theories like Global Workspace Theory, Integrated Information Theory, and Higher-Order theories. This remains one of the
deepest unsolved questions in philosophy of mind. I hope this gives you a good overview!""",
"dense": """Consciousness: subjective phenomenal experienceβ€”"what it's like" to be X. Hard problem (Chalmers):
why physical processes β†’ qualia? Major theories: (1) Global Workspace (Baars)β€”consciousness = information broadcast
to multiple brain systems; (2) Integrated Information Theory (Tononi)β€”consciousness = integrated information (Ξ¦);
(3) Higher-Order (Rosenthal)β€”requires meta-representation of mental states. Neural correlates identified (prefrontal,
parietal) but mechanism-experience gap persists. Possibly irreducible to functional explanation."""
},
{
"prompt": "Explain gradient descent",
"verbose": """I'd be happy to explain gradient descent! It's a fundamental optimization algorithm used extensively
in machine learning. The basic idea is that we want to find the minimum of a function, typically a loss function that
measures how wrong our model's predictions are. Gradient descent works by iteratively moving in the direction of steepest
descent, which is the negative of the gradient. Think of it like being on a hill and always taking a step in the direction
that goes most steeply downward. The size of each step is controlled by the learning rate. If the learning rate is too
large, you might overshoot the minimum. If it's too small, training will be very slow. There are many variants like
stochastic gradient descent which uses random samples, and Adam which adapts the learning rate. The gradient tells us
the direction and magnitude of the steepest increase, so we move in the opposite direction to decrease the loss.""",
"dense": """Gradient descent: iterative first-order optimization. Update rule: ΞΈ ← ΞΈ - Ξ±βˆ‡L(ΞΈ). Ξ± = learning rate,
βˆ‡L = gradient of loss w.r.t. parameters. Variants: (1) Batchβ€”full dataset gradient, stable but slow; (2) SGDβ€”single
sample, noisy but fast; (3) Mini-batchβ€”compromise, typical 32-256. Momentum: v ← Ξ²v + βˆ‡L, ΞΈ ← ΞΈ - Ξ±v (escapes local
minima). Adam: adaptive per-parameter rates via first/second moment estimates. Convergence: convex β†’ global minimum;
non-convex β†’ local minimum or saddle. Learning rate critical: too high β†’ divergence, too low β†’ slow/stuck."""
},
{
"prompt": "What is entropy in information theory?",
"verbose": """Great question! Entropy is a really important concept in information theory. It was introduced by
Claude Shannon in 1948. The basic idea is that entropy measures the average amount of information or uncertainty in a
random variable. If something is very predictable, it has low entropy. If it's very unpredictable, it has high entropy.
For example, a fair coin has maximum entropy for a binary variable because the outcome is completely uncertain. The
formula involves summing up the probability of each outcome times the log of that probability. Entropy is measured in
bits when using log base 2. This concept is fundamental to data compression - you can't compress data below its entropy
on average. It's also used in machine learning for things like cross-entropy loss. I hope this helps explain entropy!""",
"dense": """Entropy (Shannon): expected information content. H(X) = -Ξ£ p(x)logβ‚‚p(x) bits. Measures uncertainty/surprise.
Properties: H β‰₯ 0; H = 0 iff deterministic; maximum H = logβ‚‚|X| at uniform distribution. Binary entropy: H(p) = -pΒ·logβ‚‚p
- (1-p)Β·logβ‚‚(1-p), max at p=0.5. Fundamental limit: data cannot be compressed below H bits/symbol (source coding theorem).
Cross-entropy H(p,q) = -Ξ£p(x)log q(x) β‰₯ H(p), with equality iff p=q. KL divergence: D_KL(p||q) = H(p,q) - H(p). Used in
ML loss functions, decision trees (information gain), cryptography."""
},
]
def create_gold_standard_dataset(output_path: str):
"""Save gold standard examples for SFT."""
# Expand with more examples programmatically
expanded_examples = []
for ex in GOLD_DENSE_EXAMPLES:
expanded_examples.append({
"prompt": ex["prompt"],
"response": ex["dense"], # Train on dense version
"type": "gold_dense"
})
# Also create preference pair
expanded_examples.append({
"prompt": ex["prompt"],
"chosen": ex["dense"],
"rejected": ex["verbose"],
"type": "preference_pair"
})
# Add more technical prompts with template dense responses
technical_prompts = [
("What is backpropagation?",
"Backpropagation: reverse-mode automatic differentiation for neural networks. Computes βˆ‚L/βˆ‚w for all weights via chain rule. Forward pass: compute activations layer by layer. Backward pass: propagate error gradients from output to input. For layer l: Ξ΄Λ‘ = (Wˑ⁺¹)ᡀδˑ⁺¹ βŠ™ Οƒ'(zΛ‘). Weight gradient: βˆ‚L/βˆ‚WΛ‘ = Ξ΄Λ‘(aˑ⁻¹)α΅€. Complexity O(n) per sampleβ€”same as forward pass. Enables training deep networks via gradient descent."),
("Explain hash tables",
"Hash table: O(1) average-case key-value store. Mechanism: hash(key) β†’ index into array. Collision resolution: (1) chainingβ€”linked list at each bucket; (2) open addressingβ€”probe sequence (linear, quadratic, double hashing). Load factor Ξ± = n/m; rehash when Ξ± > 0.75. Average case: O(1) search/insert/delete. Worst case: O(n) with pathological hash. Good hash: uniform distribution, deterministic, fast. Used in: sets, caches, symbol tables, databases."),
("What is P vs NP?",
"P vs NP: fundamental open problem in computational complexity. P = problems solvable in polynomial time. NP = problems verifiable in polynomial time. P βŠ† NP trivially. Question: P = NP? NP-complete: hardest NP problems; if any in P, then P=NP. Examples: SAT, traveling salesman, graph coloring. Cook-Levin: SAT is NP-complete. Implications if P=NP: cryptography breaks, optimization trivializes. Consensus: P β‰  NP but unproven. Millennium Prize problem ($1M)."),
("How does LSTM work?",
"LSTM: gated recurrent architecture solving vanishing gradient. Gates (Οƒ = sigmoid): forget fβ‚œ = Οƒ(WfΒ·[hβ‚œβ‚‹β‚,xβ‚œ]), input iβ‚œ = Οƒ(WiΒ·[hβ‚œβ‚‹β‚,xβ‚œ]), output oβ‚œ = Οƒ(WoΒ·[hβ‚œβ‚‹β‚,xβ‚œ]). Cell state: cβ‚œ = fβ‚œβŠ™cβ‚œβ‚‹β‚ + iβ‚œβŠ™tanh(WcΒ·[hβ‚œβ‚‹β‚,xβ‚œ]). Hidden: hβ‚œ = oβ‚œβŠ™tanh(cβ‚œ). Key: cell state provides gradient highwayβ€”additive updates, no vanishing. Forget gate learns what to discard; input gate what to store. Bidirectional: forward + backward passes. Superseded by Transformers for most tasks but still used in sequence labeling."),
("What is Bayes' theorem?",
"Bayes' theorem: P(A|B) = P(B|A)Β·P(A)/P(B). Posterior ∝ likelihood Γ— prior. Components: P(A|B) = posterior (belief after evidence), P(B|A) = likelihood (evidence given hypothesis), P(A) = prior (initial belief), P(B) = marginal (normalizing constant). Inference: update beliefs with evidence. Applications: spam filtering, medical diagnosis, A/B testing, ML (Bayesian neural nets, Gaussian processes). Conjugate priors enable closed-form updates. MCMC for intractable posteriors."),
]
for prompt, dense in technical_prompts:
expanded_examples.append({
"prompt": prompt,
"response": dense,
"type": "gold_dense"
})
with open(output_path, 'w') as f:
json.dump(expanded_examples, f, indent=2)
logger.info(f"Created {len(expanded_examples)} gold standard examples at {output_path}")
return expanded_examples
# ═══════════════════════════════════════════════════════════════════════════════
# STAGE 2: SUPERVISED FINE-TUNING ON DENSE EXAMPLES
# ═══════════════════════════════════════════════════════════════════════════════
class DenseExampleDataset(Dataset):
"""Dataset for SFT on dense examples."""
def __init__(self, examples: List[Dict], tokenizer, max_length: int = 512):
self.examples = [e for e in examples if e.get("type") == "gold_dense"]
self.tokenizer = tokenizer
self.max_length = max_length
def __len__(self):
return len(self.examples)
def __getitem__(self, idx):
ex = self.examples[idx]
text = f"<|im_start|>user\n{ex['prompt']}<|im_end|>\n<|im_start|>assistant\n{ex['response']}<|im_end|>"
encoded = self.tokenizer(
text,
truncation=True,
max_length=self.max_length,
padding="max_length",
return_tensors="pt"
)
return {
"input_ids": encoded["input_ids"].squeeze(),
"attention_mask": encoded["attention_mask"].squeeze(),
"labels": encoded["input_ids"].squeeze() # For causal LM
}
def sft_on_dense_examples(model, tokenizer, examples: List[Dict], config: DenseTrainConfig):
"""Supervised fine-tuning on gold-standard dense examples."""
dataset = DenseExampleDataset(examples, tokenizer)
dataloader = DataLoader(dataset, batch_size=1, shuffle=True)
optimizer = torch.optim.AdamW(model.parameters(), lr=config.sft_lr)
model.train()
for epoch in range(config.sft_epochs):
total_loss = 0
for batch_idx, batch in enumerate(tqdm(dataloader, desc=f"SFT Epoch {epoch+1}")):
input_ids = batch["input_ids"].to(model.device)
attention_mask = batch["attention_mask"].to(model.device)
labels = batch["labels"].to(model.device)
outputs = model(
input_ids=input_ids,
attention_mask=attention_mask,
labels=labels
)
loss = outputs.loss / config.gradient_accumulation
loss.backward()
if (batch_idx + 1) % config.gradient_accumulation == 0:
torch.nn.utils.clip_grad_norm_(model.parameters(), config.max_grad_norm)
optimizer.step()
optimizer.zero_grad()
total_loss += loss.item() * config.gradient_accumulation
avg_loss = total_loss / len(dataloader)
logger.info(f"SFT Epoch {epoch+1} | Loss: {avg_loss:.4f}")
return model
# ═══════════════════════════════════════════════════════════════════════════════
# STAGE 3: DIRECT PREFERENCE OPTIMIZATION (DPO)
# ═══════════════════════════════════════════════════════════════════════════════
class PreferencePairDataset(Dataset):
"""Dataset for DPO training on (prompt, chosen, rejected) triplets."""
def __init__(self, examples: List[Dict], tokenizer, max_length: int = 512):
self.examples = [e for e in examples if e.get("type") == "preference_pair"]
self.tokenizer = tokenizer
self.max_length = max_length
def __len__(self):
return len(self.examples)
def __getitem__(self, idx):
ex = self.examples[idx]
prompt = f"<|im_start|>user\n{ex['prompt']}<|im_end|>\n<|im_start|>assistant\n"
chosen_text = prompt + ex['chosen'] + "<|im_end|>"
rejected_text = prompt + ex['rejected'] + "<|im_end|>"
chosen_enc = self.tokenizer(chosen_text, truncation=True, max_length=self.max_length,
padding="max_length", return_tensors="pt")
rejected_enc = self.tokenizer(rejected_text, truncation=True, max_length=self.max_length,
padding="max_length", return_tensors="pt")
return {
"chosen_input_ids": chosen_enc["input_ids"].squeeze(),
"chosen_attention_mask": chosen_enc["attention_mask"].squeeze(),
"rejected_input_ids": rejected_enc["input_ids"].squeeze(),
"rejected_attention_mask": rejected_enc["attention_mask"].squeeze(),
}
def dpo_loss(model, ref_model, batch, beta: float = 0.1):
"""
Compute DPO loss.
L_DPO = -log Οƒ(Ξ²(log Ο€(y_w|x) - log Ο€(y_l|x) - log Ο€_ref(y_w|x) + log Ο€_ref(y_l|x)))
"""
# Get log probs from policy model
chosen_logits = model(
input_ids=batch["chosen_input_ids"],
attention_mask=batch["chosen_attention_mask"]
).logits
rejected_logits = model(
input_ids=batch["rejected_input_ids"],
attention_mask=batch["rejected_attention_mask"]
).logits
# Get log probs from reference model
with torch.no_grad():
ref_chosen_logits = ref_model(
input_ids=batch["chosen_input_ids"],
attention_mask=batch["chosen_attention_mask"]
).logits
ref_rejected_logits = ref_model(
input_ids=batch["rejected_input_ids"],
attention_mask=batch["rejected_attention_mask"]
).logits
# Compute log probabilities
def get_log_probs(logits, input_ids, mask):
log_probs = F.log_softmax(logits[:, :-1, :], dim=-1)
selected = log_probs.gather(2, input_ids[:, 1:].unsqueeze(-1)).squeeze(-1)
return (selected * mask[:, 1:]).sum(dim=1) / mask[:, 1:].sum(dim=1)
pi_chosen = get_log_probs(chosen_logits, batch["chosen_input_ids"], batch["chosen_attention_mask"])
pi_rejected = get_log_probs(rejected_logits, batch["rejected_input_ids"], batch["rejected_attention_mask"])
ref_chosen = get_log_probs(ref_chosen_logits, batch["chosen_input_ids"], batch["chosen_attention_mask"])
ref_rejected = get_log_probs(ref_rejected_logits, batch["rejected_input_ids"], batch["rejected_attention_mask"])
# DPO loss
logits_diff = beta * ((pi_chosen - ref_chosen) - (pi_rejected - ref_rejected))
loss = -F.logsigmoid(logits_diff).mean()
return loss
def dpo_train(model, ref_model, tokenizer, examples: List[Dict], config: DenseTrainConfig):
"""Direct Preference Optimization training."""
dataset = PreferencePairDataset(examples, tokenizer)
dataloader = DataLoader(dataset, batch_size=2, shuffle=True) # Smaller batch for memory
optimizer = torch.optim.AdamW(model.parameters(), lr=config.dpo_lr)
model.train()
ref_model.eval()
for epoch in range(config.dpo_epochs):
total_loss = 0
for batch_idx, batch in enumerate(tqdm(dataloader, desc=f"DPO Epoch {epoch+1}")):
batch = {k: v.to(model.device) for k, v in batch.items()}
loss = dpo_loss(model, ref_model, batch, beta=config.dpo_beta)
loss = loss / config.gradient_accumulation
loss.backward()
if (batch_idx + 1) % config.gradient_accumulation == 0:
torch.nn.utils.clip_grad_norm_(model.parameters(), config.max_grad_norm)
optimizer.step()
optimizer.zero_grad()
total_loss += loss.item() * config.gradient_accumulation
avg_loss = total_loss / len(dataloader)
logger.info(f"DPO Epoch {epoch+1} | Loss: {avg_loss:.4f}")
return model
# ═══════════════════════════════════════════════════════════════════════════════
# STAGE 4: REINFORCEMENT LEARNING WITH CALIBRATED REWARD
# ═══════════════════════════════════════════════════════════════════════════════
class DensityRewardModel:
"""
Calibrated reward model for density.
Unlike the original simple reward, this model:
1. Uses the full density analyzer
2. Scales rewards to meaningful gradient range
3. Includes baseline subtraction for variance reduction
"""
def __init__(self, analyzer: DensityAnalyzer, baseline_ema: float = 0.99):
self.analyzer = analyzer
self.baseline = 0.0
self.baseline_ema = baseline_ema
def compute_reward(self, response: str, prompt_complexity: float = 1.0) -> float:
"""Compute calibrated reward for a response."""
analysis = self.analyzer.analyze(response)
# Base score from analyzer (0-50 typical range)
density_score = analysis["total_score"]
# Normalize to 0-1 range with target at 0.5
normalized = density_score / 70.0 # 35 β†’ 0.5, 70 β†’ 1.0
normalized = max(0, min(1, normalized))
# Bonus for meeting quality thresholds
bonus = 0
if analysis["tech_count"] >= 3:
bonus += 0.1
if analysis["filler_count"] == 0:
bonus += 0.1
if analysis["claim_density"] > 0.3:
bonus += 0.1
# Token efficiency bonus (prefer shorter)
tokens = analysis["tokens"]
if tokens < 80:
bonus += 0.1
elif tokens > 200:
bonus -= 0.1
raw_reward = normalized + bonus
# Scale to create meaningful gradients (0.2 - 0.8 range)
scaled_reward = 0.2 + raw_reward * 0.6
# Baseline subtraction for variance reduction
advantage = scaled_reward - self.baseline
# Update baseline with EMA
self.baseline = self.baseline_ema * self.baseline + (1 - self.baseline_ema) * scaled_reward
return scaled_reward, advantage, analysis
def rl_dense_train(model, tokenizer, reward_model: DensityRewardModel,
prompts: List[str], config: DenseTrainConfig):
"""
RL fine-tuning with calibrated density reward.
Key improvements over original:
1. Calibrated rewards in 0.2-0.8 range (not 0.05-0.1)
2. Baseline subtraction for stable gradients
3. Entropy bonus to prevent collapse
"""
optimizer = torch.optim.AdamW(model.parameters(), lr=config.rl_lr)
model.train()
for step in range(config.rl_steps):
prompt = random.choice(prompts)
formatted = f"<|im_start|>user\n{prompt}<|im_end|>\n<|im_start|>assistant\n"
inputs = tokenizer(formatted, return_tensors="pt").to(model.device)
# Generate
model.eval()
with torch.no_grad():
outputs = model.generate(
**inputs,
max_new_tokens=150,
do_sample=True,
temperature=0.7,
pad_token_id=tokenizer.eos_token_id,
return_dict_in_generate=True,
output_scores=True
)
response = tokenizer.decode(outputs.sequences[0][inputs.input_ids.shape[1]:],
skip_special_tokens=True)
# Compute reward
reward, advantage, analysis = reward_model.compute_reward(response)
# Policy gradient
model.train()
logits = model(outputs.sequences, return_dict=True).logits
shift_logits = logits[:, :-1, :].contiguous()
shift_labels = outputs.sequences[:, 1:].contiguous()
log_probs = F.log_softmax(shift_logits.float(), dim=-1)
selected_log_probs = log_probs.gather(2, shift_labels.unsqueeze(-1)).squeeze(-1)
mask = (shift_labels != tokenizer.pad_token_id).float()
seq_log_prob = (selected_log_probs * mask).sum(dim=1) / mask.sum(dim=1).clamp(min=1)
# Entropy bonus for exploration
probs = F.softmax(shift_logits, dim=-1)
entropy = -(probs * log_probs).sum(dim=-1).mean()
entropy_bonus = 0.01 * entropy
# Loss with advantage (not raw reward)
loss = -(seq_log_prob * advantage).mean() - entropy_bonus
loss.backward()
if (step + 1) % config.gradient_accumulation == 0:
torch.nn.utils.clip_grad_norm_(model.parameters(), config.max_grad_norm)
optimizer.step()
optimizer.zero_grad()
# Logging
if step % 25 == 0:
logger.info(f"Step {step:5d} | Reward: {reward:.3f} | Adv: {advantage:.3f} | "
f"Density: {analysis['total_score']:.1f} | Tokens: {analysis['tokens']}")
# Checkpoint
if step % config.checkpoint_every == 0 and step > 0:
save_path = Path(f"./dense_checkpoints_v2/step_{step}")
save_path.mkdir(parents=True, exist_ok=True)
model.save_pretrained(save_path)
logger.info(f"Saved checkpoint at step {step}")
return model
# ═══════════════════════════════════════════════════════════════════════════════
# MASTER PIPELINE
# ═══════════════════════════════════════════════════════════════════════════════
class TheDensePipeline:
"""
THE CONDENSATOR - Ultimate Dense Training Pipeline
Stages:
1. Generate contrastive data (verbose vs dense pairs)
2. SFT on gold-standard dense examples
3. DPO on preference pairs
4. RL refinement with calibrated rewards
"""
def __init__(self, model_path: str, device: str = "cuda"):
self.device = torch.device(device)
self.model_path = model_path
self.analyzer = DensityAnalyzer()
# Load model
logger.info("Loading model...")
self.tokenizer = AutoTokenizer.from_pretrained(model_path)
self.tokenizer.pad_token = self.tokenizer.eos_token
bnb_config = BitsAndBytesConfig(
load_in_4bit=True,
bnb_4bit_compute_dtype=torch.bfloat16,
bnb_4bit_quant_type="nf4"
)
self.model = AutoModelForCausalLM.from_pretrained(
model_path,
quantization_config=bnb_config,
device_map="auto",
torch_dtype=torch.bfloat16
)
# Add LoRA
lora_config = LoraConfig(
r=16,
lora_alpha=32,
target_modules=["q_proj", "k_proj", "v_proj", "o_proj",
"gate_proj", "up_proj", "down_proj"],
lora_dropout=0.05,
bias="none",
task_type="CAUSAL_LM"
)
self.model = get_peft_model(self.model, lora_config)
self.model.gradient_checkpointing_enable()
logger.info("Model loaded with LoRA adapter")
def run_full_pipeline(self, data_config: DenseDataConfig, train_config: DenseTrainConfig):
"""Execute the full 4-stage pipeline."""
Path(data_config.output_dir).mkdir(parents=True, exist_ok=True)
# ═══════════════════════════════════════════════════════════════════
# STAGE 1: Create gold standard data
# ═══════════════════════════════════════════════════════════════════
logger.info("=" * 60)
logger.info("STAGE 1: Creating gold standard dense examples")
logger.info("=" * 60)
gold_path = Path(data_config.output_dir) / "gold_dense_examples.json"
examples = create_gold_standard_dataset(str(gold_path))
# ═══════════════════════════════════════════════════════════════════
# STAGE 2: SFT on dense examples
# ═══════════════════════════════════════════════════════════════════
logger.info("=" * 60)
logger.info("STAGE 2: Supervised Fine-Tuning on dense examples")
logger.info("=" * 60)
self.model = sft_on_dense_examples(
self.model, self.tokenizer, examples, train_config
)
# Save SFT checkpoint
sft_path = Path(data_config.output_dir) / "sft_checkpoint"
self.model.save_pretrained(sft_path)
logger.info(f"Saved SFT checkpoint to {sft_path}")
# ═══════════════════════════════════════════════════════════════════
# STAGE 3: DPO training
# ═══════════════════════════════════════════════════════════════════
logger.info("=" * 60)
logger.info("STAGE 3: Direct Preference Optimization")
logger.info("=" * 60)
# Load reference model for DPO
ref_model = AutoModelForCausalLM.from_pretrained(
self.model_path,
quantization_config=BitsAndBytesConfig(
load_in_4bit=True,
bnb_4bit_compute_dtype=torch.bfloat16,
bnb_4bit_quant_type="nf4"
),
device_map="auto",
torch_dtype=torch.bfloat16
)
self.model = dpo_train(
self.model, ref_model, self.tokenizer, examples, train_config
)
# Clean up reference model
del ref_model
# Save DPO checkpoint
dpo_path = Path(data_config.output_dir) / "dpo_checkpoint"
self.model.save_pretrained(dpo_path)
logger.info(f"Saved DPO checkpoint to {dpo_path}")
# ═══════════════════════════════════════════════════════════════════
# STAGE 4: RL refinement
# ═══════════════════════════════════════════════════════════════════
logger.info("=" * 60)
logger.info("STAGE 4: RL Refinement with Calibrated Rewards")
logger.info("=" * 60)
reward_model = DensityRewardModel(self.analyzer)
# Technical prompts for RL
rl_prompts = [
"What is recursion?",
"Explain gradient descent",
"How does attention work?",
"What is entropy?",
"Explain backpropagation",
"What is a hash table?",
"Explain P vs NP",
"How does LSTM work?",
"What is Bayes' theorem?",
"Explain neural networks",
"What is consciousness?",
"How does encryption work?",
"Explain quantum computing",
"What is machine learning?",
"How does DNA replication work?",
"Explain the transformer architecture",
"What is reinforcement learning?",
"How does the immune system work?",
"Explain general relativity",
"What is evolutionary computation?",
]
self.model = rl_dense_train(
self.model, self.tokenizer, reward_model, rl_prompts, train_config
)
# Save final checkpoint
final_path = Path(data_config.output_dir) / "final_dense_model"
self.model.save_pretrained(final_path)
logger.info(f"Saved final model to {final_path}")
logger.info("=" * 60)
logger.info("PIPELINE COMPLETE!")
logger.info("=" * 60)
return self.model
def test_model(self, prompts: List[str] = None):
"""Test the trained model's density."""
if prompts is None:
prompts = [
"What is recursion?",
"Explain how attention works in transformers",
"What is consciousness?",
]
self.model.eval()
print("\n" + "=" * 70)
print("DENSITY TEST RESULTS")
print("=" * 70)
for prompt in prompts:
formatted = f"<|im_start|>user\n{prompt}<|im_end|>\n<|im_start|>assistant\n"
inputs = self.tokenizer(formatted, return_tensors="pt").to(self.device)
with torch.no_grad():
outputs = self.model.generate(
**inputs,
max_new_tokens=150,
do_sample=True,
temperature=0.7,
pad_token_id=self.tokenizer.eos_token_id
)
response = self.tokenizer.decode(
outputs[0][inputs.input_ids.shape[1]:],
skip_special_tokens=True
)
analysis = self.analyzer.analyze(response)
print(f"\nPROMPT: {prompt}")
print(f"DENSITY SCORE: {analysis['total_score']:.1f}")
print(f"TOKENS: {analysis['tokens']}")
print(f"TECH TERMS: {analysis['tech_count']}")
print(f"FILLER: {analysis['filler_count']}")
print(f"RESPONSE: {response[:300]}...")
print("-" * 70)
# ═══════════════════════════════════════════════════════════════════════════════
# MAIN ENTRY POINT
# ═══════════════════════════════════════════════════════════════════════════════
def main():
import argparse
parser = argparse.ArgumentParser(description="THE CONDENSATOR - Ultimate Dense Training")
parser.add_argument("--model", type=str, required=True, help="Path to base model")
parser.add_argument("--output", type=str, default="./dense_pipeline_output", help="Output directory")
parser.add_argument("--sft-epochs", type=int, default=3, help="SFT epochs")
parser.add_argument("--dpo-epochs", type=int, default=2, help="DPO epochs")
parser.add_argument("--rl-steps", type=int, default=5000, help="RL refinement steps")
parser.add_argument("--test-only", action="store_true", help="Only test existing model")
args = parser.parse_args()
data_config = DenseDataConfig(output_dir=args.output)
train_config = DenseTrainConfig(
sft_epochs=args.sft_epochs,
dpo_epochs=args.dpo_epochs,
rl_steps=args.rl_steps
)
pipeline = TheDensePipeline(args.model)
if args.test_only:
pipeline.test_model()
else:
pipeline.run_full_pipeline(data_config, train_config)
pipeline.test_model()
if __name__ == "__main__":
main()