Spaces:
Build error
Build error
| """ | |
| Binary Semantic Encoder with Golden Ratio Sampling | |
| ================================================= | |
| Transforms TF-IDF vectors into binary fingerprints using SVD and phase collapse. | |
| Implements golden ratio sampling for optimal pattern capture. | |
| """ | |
| import time | |
| import logging | |
| from pathlib import Path | |
| from datetime import datetime | |
| import json | |
| import numpy as np | |
| import torch | |
| from tqdm import tqdm | |
| import traceback | |
| from sklearn.feature_extraction.text import TfidfVectorizer | |
| logger = logging.getLogger(__name__) | |
| class GoldenRatioEncoder: | |
| """ | |
| Encodes text into binary fingerprints using quantum-inspired phase collapse. | |
| Based on quantum consciousness principles for optimal pattern capture. | |
| """ | |
| def __init__(self, n_bits=128, max_features=10000, device='cpu'): | |
| self.n_bits = n_bits | |
| self.max_features = max_features | |
| self.golden_ratio = (1 + np.sqrt(5)) / 2 | |
| self.device = device | |
| # Components to be learned | |
| self.vectorizer = None | |
| self.projection = None | |
| self.singular_values = None | |
| self.sample_indices = None | |
| self.training_stats = {} | |
| logger.info(f"Initialized GoldenRatioEncoder") | |
| logger.info(f" n_bits: {n_bits}") | |
| logger.info(f" max_features: {max_features}") | |
| logger.info(f" golden_ratio: {self.golden_ratio:.6f}") | |
| def _golden_ratio_sample(self, n_total, target_memory_gb=50): | |
| """ | |
| Sample using golden ratio until it fits in memory. | |
| Args: | |
| n_total: Total number of items | |
| target_memory_gb: Target memory usage | |
| Returns: | |
| sample_indices: Indices to sample | |
| """ | |
| # Calculate how many samples we can fit | |
| bytes_per_element = 4 # float32 | |
| elements_per_sample = self.max_features | |
| bytes_per_sample = bytes_per_element * elements_per_sample | |
| max_samples = int(target_memory_gb * 1e9 / bytes_per_sample) | |
| # Apply golden ratio reduction until it fits | |
| sample_size = n_total | |
| reduction_level = 0 | |
| while sample_size > max_samples: | |
| sample_size = int(sample_size / self.golden_ratio) | |
| reduction_level += 1 | |
| logger.info(f"Golden ratio sampling:") | |
| logger.info(f" Original: {n_total:,} samples") | |
| logger.info(f" Reduced: {sample_size:,} samples") | |
| logger.info(f" Reduction levels: {reduction_level}") | |
| logger.info(f" Coverage: {sample_size/n_total*100:.1f}%") | |
| # Create indices with logarithmic distribution | |
| if sample_size < n_total: | |
| indices = np.unique(np.logspace( | |
| 0, np.log10(n_total-1), sample_size | |
| ).astype(int)) | |
| else: | |
| indices = np.arange(n_total) | |
| logger.info(f" Selected {len(indices):,} unique indices") | |
| return indices | |
| def train(self, titles, memory_limit_gb=50, batch_size=10000): | |
| """ | |
| Train encoder using golden ratio sampling. | |
| This is the method called by the training script. | |
| Args: | |
| titles: List of all titles | |
| memory_limit_gb: Memory limit for computation | |
| batch_size: Not used in fit, but kept for compatibility | |
| """ | |
| self.fit(titles, memory_limit_gb) | |
| def fit(self, titles, memory_limit_gb=50): | |
| """ | |
| Fit encoder using golden ratio sampling. | |
| Args: | |
| titles: List of all titles | |
| memory_limit_gb: Memory limit for computation | |
| """ | |
| start_time = time.time() | |
| logger.info(f"Training encoder on {len(titles):,} titles...") | |
| # Step 1: Fit vectorizer on ALL titles (learns vocabulary) | |
| logger.info("Step 1: Learning vocabulary from all titles...") | |
| t0 = time.time() | |
| self.vectorizer = TfidfVectorizer( | |
| analyzer='char', | |
| ngram_range=(3, 5), | |
| max_features=self.max_features, | |
| lowercase=True, | |
| dtype=np.float32 | |
| ) | |
| self.vectorizer.fit(titles) | |
| vocab_size = len(self.vectorizer.vocabulary_) | |
| logger.info(f" Vocabulary size: {vocab_size:,}") | |
| logger.info(f" Time: {time.time() - t0:.2f}s") | |
| # Step 2: Golden ratio sampling | |
| logger.info("Step 2: Golden ratio sampling...") | |
| t0 = time.time() | |
| self.sample_indices = self._golden_ratio_sample( | |
| len(titles), memory_limit_gb | |
| ) | |
| sample_titles = [titles[i] for i in self.sample_indices] | |
| logger.info(f" Time: {time.time() - t0:.2f}s") | |
| # Step 3: Transform sample and compute SVD | |
| logger.info(f"Step 3: Transforming {len(sample_titles):,} sampled titles...") | |
| t0 = time.time() | |
| X_sample = self.vectorizer.transform(sample_titles) | |
| X_dense = X_sample.toarray() | |
| logger.info(f" Matrix shape: {X_dense.shape}") | |
| logger.info(f" Matrix memory: {X_dense.nbytes / 1e9:.2f} GB") | |
| # Convert to PyTorch for SVD | |
| X_tensor = torch.from_numpy(X_dense).float() | |
| if self.device != 'cpu' and torch.cuda.is_available(): | |
| X_tensor = X_tensor.to(self.device) | |
| logger.info(f" Time: {time.time() - t0:.2f}s") | |
| # Step 4: SVD with energy analysis | |
| logger.info("Step 4: Computing SVD with energy analysis...") | |
| t0 = time.time() | |
| U, S, Vh = torch.linalg.svd(X_tensor, full_matrices=False) | |
| # Energy analysis | |
| energy = S ** 2 | |
| total_energy = energy.sum() | |
| energy_threshold = energy.mean() | |
| # Find components above mean energy | |
| n_components = torch.sum(energy > energy_threshold).item() | |
| # Constrain to reasonable range | |
| n_components = np.clip(n_components, 64, min(self.n_bits, len(S))) | |
| # Calculate explained variance | |
| explained_variance = energy[:n_components].sum() / total_energy | |
| logger.info(f" Total singular values: {len(S)}") | |
| logger.info(f" Energy threshold: {energy_threshold:.2f}") | |
| logger.info(f" Selected components: {n_components}") | |
| logger.info(f" Explained variance: {explained_variance:.3f}") | |
| logger.info(f" Top 5 singular values: {S[:5].cpu().numpy()}") | |
| logger.info(f" Time: {time.time() - t0:.2f}s") | |
| # Step 5: Store projection matrix | |
| self.projection = Vh[:n_components].T.cpu().numpy() | |
| self.singular_values = S[:n_components].cpu().numpy() | |
| self.n_components = n_components | |
| # Step 6: Validate coherence | |
| logger.info("Step 5: Validating projection coherence...") | |
| t0 = time.time() | |
| coherence = self._validate_coherence() | |
| logger.info(f" Projection coherence: {coherence:.4f}") | |
| logger.info(f" Time: {time.time() - t0:.2f}s") | |
| # Store training statistics | |
| self.training_stats = { | |
| 'n_titles': len(titles), | |
| 'n_samples': len(sample_titles), | |
| 'sample_ratio': len(sample_titles) / len(titles), | |
| 'n_features': vocab_size, | |
| 'n_components': n_components, | |
| 'explained_variance': float(explained_variance), | |
| 'coherence': float(coherence), | |
| 'training_time': time.time() - start_time, | |
| 'timestamp': datetime.now().isoformat() | |
| } | |
| logger.info(f"Training complete in {self.training_stats['training_time']:.2f}s") | |
| def encode(self, titles, batch_size=10000, show_progress=True): | |
| """ | |
| Transform titles to binary fingerprints. | |
| This method is called by the training script. | |
| Args: | |
| titles: Titles to encode | |
| batch_size: Processing batch size | |
| show_progress: Show progress bar | |
| Returns: | |
| Binary fingerprints tensor (n_titles, n_bits) | |
| """ | |
| return self.transform(titles, batch_size, show_progress) | |
| def transform(self, titles, batch_size=10000, show_progress=True): | |
| """ | |
| Transform titles to binary fingerprints. | |
| Args: | |
| titles: Titles to encode | |
| batch_size: Processing batch size | |
| show_progress: Show progress bar | |
| Returns: | |
| Binary fingerprints as torch tensor (n_titles, n_bits) | |
| """ | |
| if self.vectorizer is None: | |
| raise ValueError("Encoder must be fitted first") | |
| n_titles = len(titles) | |
| fingerprints = np.zeros((n_titles, self.n_bits), dtype=np.uint8) | |
| # Process in batches | |
| iterator = range(0, n_titles, batch_size) | |
| if show_progress: | |
| iterator = tqdm(iterator, desc="Encoding titles") | |
| for i in iterator: | |
| batch_end = min(i + batch_size, n_titles) | |
| batch = titles[i:batch_end] | |
| # Transform to TF-IDF | |
| X_batch = self.vectorizer.transform(batch) | |
| # Handle both sparse and dense matrices | |
| if hasattr(X_batch, 'toarray'): | |
| X_dense = X_batch.toarray() | |
| else: | |
| X_dense = X_batch # Already dense | |
| # Project using learned components | |
| X_projected = X_dense @ self.projection | |
| # Normalize to unit sphere | |
| norms = np.linalg.norm(X_projected, axis=1, keepdims=True) | |
| X_normalized = X_projected / (norms + 1e-8) | |
| # Extract binary phases | |
| binary = (X_normalized > 0).astype(np.uint8) | |
| # Store (handling case where n_components < n_bits) | |
| actual_bits = min(binary.shape[1], self.n_bits) | |
| fingerprints[i:batch_end, :actual_bits] = binary[:, :actual_bits] | |
| # Convert to PyTorch tensor for compatibility | |
| return torch.from_numpy(fingerprints) | |
| def encode_single(self, title): | |
| """Encode a single title.""" | |
| return self.encode([title], show_progress=False)[0] | |
| def _validate_coherence(self): | |
| """Measure coherence of projection using quantum principle.""" | |
| # Create random test vectors | |
| test_vectors = np.random.randn(100, self.projection.shape[0]) | |
| # Project | |
| projected = test_vectors @ self.projection | |
| # Convert to complex for phase analysis | |
| projected_complex = projected.astype(np.complex64) | |
| # Measure phase coherence | |
| phases = np.angle(np.sum(projected_complex, axis=1)) | |
| phase_factors = np.exp(1j * phases) | |
| coherence = np.abs(np.mean(phase_factors)) | |
| return coherence | |
| def save(self, save_dir): | |
| """Save encoder to disk.""" | |
| try: | |
| save_path = Path(save_dir) | |
| save_path.mkdir(parents=True, exist_ok=True) | |
| logger.info(f"Saving encoder to {save_path}") | |
| # Save vectorizer vocabulary and IDF as numpy arrays | |
| if self.vectorizer is None: | |
| raise ValueError("Cannot save encoder: vectorizer is None") | |
| vocab_items = sorted(self.vectorizer.vocabulary_.items(), key=lambda x: x[1]) | |
| vocab_array = np.array([item[0] for item in vocab_items], dtype=object) | |
| vocab_path = save_path / 'vocabulary.npy' | |
| logger.info(f"Saving vocabulary to {vocab_path}") | |
| np.save(vocab_path, vocab_array) | |
| idf_path = save_path / 'idf_weights.npy' | |
| logger.info(f"Saving IDF weights to {idf_path}") | |
| np.save(idf_path, self.vectorizer.idf_) | |
| # Save projection and parameters | |
| if self.projection is None: | |
| raise ValueError("Cannot save encoder: projection matrix is None") | |
| projection_path = save_path / 'projection.npy' | |
| logger.info(f"Saving projection matrix to {projection_path}") | |
| np.save(projection_path, self.projection) | |
| if self.singular_values is None: | |
| raise ValueError("Cannot save encoder: singular values are None") | |
| singular_path = save_path / 'singular_values.npy' | |
| logger.info(f"Saving singular values to {singular_path}") | |
| np.save(singular_path, self.singular_values) | |
| # Save configuration | |
| config = { | |
| 'n_bits': int(self.n_bits), | |
| 'n_components': int(self.n_components), | |
| 'max_features': int(self.max_features), | |
| 'golden_ratio': float(self.golden_ratio), | |
| 'sample_indices': self.sample_indices.tolist() if self.sample_indices is not None else None, | |
| 'training_stats': {k: (float(v) if isinstance(v, (np.floating, np.integer)) else v) | |
| for k, v in self.training_stats.items()} | |
| } | |
| config_path = save_path / 'config.json' | |
| logger.info(f"Saving config to {config_path}") | |
| with open(config_path, 'w') as f: | |
| json.dump(config, f, indent=2) | |
| # Verify all files were created | |
| expected_files = ['vocabulary.npy', 'idf_weights.npy', 'projection.npy', | |
| 'singular_values.npy', 'config.json'] | |
| for file in expected_files: | |
| file_path = save_path / file | |
| if not file_path.exists(): | |
| raise FileNotFoundError(f"Failed to save {file} - file does not exist after save") | |
| logger.info(f" Verified: {file} ({file_path.stat().st_size} bytes)") | |
| logger.info(f"Encoder saved successfully to {save_path}") | |
| except Exception as e: | |
| logger.error(f"Failed to save encoder: {str(e)}") | |
| logger.error(f"Exception type: {type(e).__name__}") | |
| logger.error("Full traceback:") | |
| logger.error(traceback.format_exc()) | |
| raise | |
| def load(self, save_dir): | |
| """Load encoder from disk.""" | |
| save_path = Path(save_dir) | |
| # Load configuration | |
| with open(save_path / 'config.json', 'r') as f: | |
| config = json.load(f) | |
| self.n_bits = config['n_bits'] | |
| self.n_components = config['n_components'] | |
| self.max_features = config['max_features'] | |
| self.golden_ratio = config['golden_ratio'] | |
| self.training_stats = config.get('training_stats', {}) | |
| # Load projection and singular values | |
| self.projection = np.load(save_path / 'projection.npy') | |
| self.singular_values = np.load(save_path / 'singular_values.npy') | |
| # Recreate vectorizer | |
| vocab_array = np.load(save_path / 'vocabulary.npy', allow_pickle=True) | |
| self.vectorizer = TfidfVectorizer( | |
| analyzer='char', | |
| ngram_range=(3, 5), | |
| max_features=self.max_features, | |
| lowercase=True, | |
| dtype=np.float32 | |
| ) | |
| # Restore vocabulary | |
| self.vectorizer.vocabulary_ = {word: idx for idx, word in enumerate(vocab_array)} | |
| self.vectorizer.idf_ = np.load(save_path / 'idf_weights.npy') | |
| logger.info(f"Encoder loaded from {save_path}") |