Spaces:
Sleeping
Sleeping
| """ | |
| SEMANTIC EMBEDDER | |
| Lightweight embedding engine for manifold pathfinding. | |
| Uses sentence-transformers (all-MiniLM-L6-v2) for 384-dim vectors. | |
| Falls back to simple TF-IDF if transformers unavailable. | |
| """ | |
| import sys | |
| import os | |
| import json | |
| import math | |
| import hashlib | |
| from typing import List, Dict | |
| # Try to import sentence-transformers | |
| try: | |
| from sentence_transformers import SentenceTransformer | |
| HAS_TRANSFORMERS = True | |
| except ImportError: | |
| HAS_TRANSFORMERS = False | |
| print("[EMBEDDER]: sentence-transformers not available, using fallback") | |
| class SemanticEmbedder: | |
| """ | |
| Generates semantic embeddings for text. | |
| Caches results to avoid recomputation. | |
| """ | |
| def __init__(self): | |
| self.cache_path = os.path.join( | |
| os.path.dirname(os.path.abspath(__file__)), | |
| "..", | |
| "Lattice_DB", | |
| "embedding_cache.json" | |
| ) | |
| self.cache = self.load_cache() | |
| # Initialize model | |
| if HAS_TRANSFORMERS: | |
| print("[EMBEDDER]: Loading sentence-transformers model...") | |
| self.model = SentenceTransformer('all-MiniLM-L6-v2') | |
| self.embed_dim = 384 | |
| self.mode = "transformers" | |
| print(f"[EMBEDDER]: Loaded (384-dim vectors)") | |
| else: | |
| self.model = None | |
| self.embed_dim = 128 # Fallback dimension | |
| self.mode = "fallback" | |
| print(f"[EMBEDDER]: Using fallback embeddings (128-dim)") | |
| def load_cache(self): | |
| """Load embedding cache from disk.""" | |
| if os.path.exists(self.cache_path): | |
| try: | |
| with open(self.cache_path, 'r', encoding='utf-8') as f: | |
| return json.load(f) | |
| except: | |
| return {} | |
| return {} | |
| def save_cache(self): | |
| """Save embedding cache to disk.""" | |
| os.makedirs(os.path.dirname(self.cache_path), exist_ok=True) | |
| with open(self.cache_path, 'w', encoding='utf-8') as f: | |
| json.dump(self.cache, f) | |
| def embed_text(self, text: str) -> List[float]: | |
| """ | |
| Generate semantic embedding for text. | |
| Args: | |
| text: Input text to embed | |
| Returns: | |
| Vector of dimension self.embed_dim | |
| """ | |
| # Check cache first | |
| cache_key = hashlib.md5(text.encode()).hexdigest() | |
| if cache_key in self.cache: | |
| return self.cache[cache_key] | |
| # Generate embedding | |
| if self.mode == "transformers": | |
| embedding = self._embed_transformers(text) | |
| else: | |
| embedding = self._embed_fallback(text) | |
| # Cache result | |
| self.cache[cache_key] = embedding | |
| # Save every 10 embeddings | |
| if len(self.cache) % 10 == 0: | |
| self.save_cache() | |
| return embedding | |
| def _embed_transformers(self, text: str) -> List[float]: | |
| """Use sentence-transformers to generate embedding.""" | |
| embedding = self.model.encode(text, convert_to_numpy=True) | |
| return embedding.tolist() | |
| def _embed_fallback(self, text: str) -> List[float]: | |
| """ | |
| Fallback embedding using simple TF-IDF-like approach. | |
| Not as good as transformers, but better than hash functions. | |
| """ | |
| # Tokenize | |
| tokens = text.lower().split() | |
| # Character n-grams for robustness | |
| char_ngrams = [] | |
| for i in range(len(text) - 2): | |
| char_ngrams.append(text[i:i+3].lower()) | |
| # Create sparse vector | |
| vector = [0.0] * self.embed_dim | |
| # Hash tokens into vector dimensions | |
| for token in tokens: | |
| idx = hash(token) % self.embed_dim | |
| vector[idx] += 1.0 | |
| # Hash character n-grams | |
| for ngram in char_ngrams: | |
| idx = hash(ngram) % self.embed_dim | |
| vector[idx] += 0.5 | |
| # Normalize | |
| magnitude = math.sqrt(sum(x * x for x in vector)) | |
| if magnitude > 0: | |
| vector = [x / magnitude for x in vector] | |
| return vector | |
| def cosine_similarity(self, vec_a: List[float], vec_b: List[float]) -> float: | |
| """ | |
| Calculate cosine similarity between two vectors. | |
| Returns: | |
| Similarity score in [0, 1] (higher = more similar) | |
| """ | |
| if len(vec_a) != len(vec_b): | |
| raise ValueError(f"Vector dimension mismatch: {len(vec_a)} vs {len(vec_b)}") | |
| # Dot product | |
| dot_product = sum(a * b for a, b in zip(vec_a, vec_b)) | |
| # Magnitudes | |
| mag_a = math.sqrt(sum(a * a for a in vec_a)) | |
| mag_b = math.sqrt(sum(b * b for b in vec_b)) | |
| if mag_a == 0 or mag_b == 0: | |
| return 0.0 | |
| similarity = dot_product / (mag_a * mag_b) | |
| # Clamp to [0, 1] | |
| return max(0.0, min(1.0, similarity)) | |
| def get_cached_embedding(self, text: str) -> List[float]: | |
| """ | |
| Get embedding from cache if available, otherwise generate. | |
| Same as embed_text() but explicit about caching. | |
| """ | |
| return self.embed_text(text) | |
| def clear_cache(self): | |
| """Clear embedding cache.""" | |
| self.cache = {} | |
| if os.path.exists(self.cache_path): | |
| os.remove(self.cache_path) | |
| print("[EMBEDDER]: Cache cleared") | |
| if __name__ == "__main__": | |
| print("="*60) | |
| print("SEMANTIC EMBEDDER - Test Suite") | |
| print("="*60 + "\n") | |
| embedder = SemanticEmbedder() | |
| # Test 1: Basic embedding | |
| print("Test 1: Basic Embedding") | |
| text = "React hooks allow functional components to use state" | |
| embedding = embedder.embed_text(text) | |
| print(f" Text: '{text}'") | |
| print(f" Embedding dim: {len(embedding)}") | |
| print(f" First 5 values: {embedding[:5]}") | |
| # Test 2: Similarity between related concepts | |
| print("\nTest 2: Semantic Similarity") | |
| concepts = [ | |
| "React hooks and useEffect", | |
| "Functional components with state management", | |
| "Database connection pooling", | |
| "Singleton design pattern" | |
| ] | |
| embeddings = [embedder.embed_text(c) for c in concepts] | |
| print("\nSimilarity Matrix:") | |
| for i, concept_i in enumerate(concepts): | |
| for j, concept_j in enumerate(concepts): | |
| if j >= i: # Only upper triangle | |
| sim = embedder.cosine_similarity(embeddings[i], embeddings[j]) | |
| print(f" [{i}] ↔ [{j}]: {sim:.3f}") | |
| print("\nConcept Labels:") | |
| for i, c in enumerate(concepts): | |
| print(f" [{i}]: {c}") | |
| # Test 3: Cache performance | |
| print("\nTest 3: Cache Performance") | |
| import time | |
| test_text = "This is a test string for cache performance" | |
| # First call (no cache) | |
| start = time.time() | |
| _ = embedder.embed_text(test_text) | |
| first_time = time.time() - start | |
| # Second call (cached) | |
| start = time.time() | |
| _ = embedder.embed_text(test_text) | |
| second_time = time.time() - start | |
| print(f" First call: {first_time*1000:.2f}ms") | |
| print(f" Cached call: {second_time*1000:.2f}ms") | |
| if second_time > 0: | |
| print(f" Speedup: {first_time/second_time:.1f}x") | |
| else: | |
| print(f" Speedup: >100x (instant cache)") | |
| # Save cache | |
| embedder.save_cache() | |
| print(f"\n✅ Embedder operational") | |
| print(f" Mode: {embedder.mode}") | |
| print(f" Dimension: {embedder.embed_dim}") | |
| print(f" Cached embeddings: {len(embedder.cache)}") | |