Spaces:
Sleeping
Sleeping
| """ | |
| IN-MEMORY PATTERN INDEX | |
| Fast lookup without HDD writes - merge existing + conversation + Gemini chat patterns | |
| """ | |
| import sys | |
| import os | |
| import json | |
| import time | |
| import re | |
| try: | |
| from System.semantic_embedder import SemanticEmbedder | |
| except ImportError: | |
| try: | |
| from semantic_embedder import SemanticEmbedder | |
| except ImportError: | |
| # Final fallback for scripts in Shop/ | |
| sys.path.append(os.path.dirname(os.path.abspath(__file__))) | |
| from semantic_embedder import SemanticEmbedder | |
| # Existing 5 lattice patterns | |
| LATTICE_PATTERNS = { | |
| "PATTERN_SINGLETON_DATABASE": { | |
| "lba": 8534859776, | |
| "domain": "SOFTWARE_ARCHITECTURE", | |
| "problem": "Need to ensure only one database connection exists", | |
| "solution": "Singleton pattern with thread-safe initialization", | |
| "reusability": 9, | |
| "confidence": 0.82 | |
| }, | |
| "PATTERN_REACT_HOOKS_DEPS": { | |
| "lba": 3371401216, | |
| "domain": "WEB_DEVELOPMENT", | |
| "problem": "React component not re-rendering when props change", | |
| "solution": "Add dependency array to useEffect", | |
| "reusability": 10, | |
| "confidence": 0.85 | |
| } | |
| } | |
| CONVERSATION_PATTERNS = { | |
| "AGENT_IS_LATTICE": { | |
| "domain": "CONCEPTUAL", | |
| "problem": "Separation between agent and data structure", | |
| "solution": "Agent is non-orientable surface - no inside/outside separation", | |
| "confidence": 0.95 | |
| } | |
| } | |
| class InMemoryIndex: | |
| """ | |
| Adaptive Distillation Index. | |
| Tracks pattern hit counts to distinguish signal from noise: | |
| - Once-patterns (1 hit) = UNCONFIRMED (might be noise) | |
| - Twice-patterns (2 hits) = PLAUSIBLE | |
| - Multi-patterns (3+ hits) = CONFIRMED (logic) | |
| The lattice self-cleans through use. Signal persists, noise decays. | |
| """ | |
| # Hit tracking file handled dynamically in __init__ | |
| HIT_LOG_PATH = None | |
| # Magnitude layers: logic exists in layers | |
| # Layer 0: Surface (keyword substring match) = low magnitude | |
| # Layer 1: Structural (multi-word + domain match) = medium magnitude | |
| # Layer 2: Conceptual (phrase match in problem/solution) = high magnitude | |
| # Decay: magnitude halves every DECAY_HALF_LIFE seconds without a hit | |
| DECAY_HALF_LIFE = 86400 # 24 hours | |
| MAGNITUDE_LAYERS = { | |
| "surface": 0.3, # keyword substring match (low relevance) | |
| "structural": 0.6, # multi-word + domain match (medium) | |
| "conceptual": 1.0, # full phrase match in problem/solution (high) | |
| } | |
| def __init__(self): | |
| # Handle relative pathing for portability | |
| BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) | |
| self.LATTICE_DB_DIR = os.path.join(BASE_DIR, "Lattice_DB") | |
| self.HIT_LOG_PATH = os.path.join(self.LATTICE_DB_DIR, "pattern_hits.json") | |
| index_path = os.path.join(self.LATTICE_DB_DIR, "dual_anchor_index.json") | |
| if os.path.exists(index_path): | |
| with open(index_path, 'r') as f: | |
| data = json.load(f) | |
| self.patterns = data.get('patterns', {}) | |
| sources = data.get('sources', {}) | |
| print(f"[INDEX] Loaded {len(self.patterns)} dual-anchor patterns") | |
| else: | |
| # Fallback to original patterns | |
| self.patterns = {} | |
| self.load_lattice_patterns() | |
| self.load_conversation_patterns() | |
| print("[INDEX] Dual-anchor index not found, using original 16 patterns") | |
| # Load hit tracking (magnitude-weighted) | |
| self.hits = self._load_hits() | |
| # Calculate adaptive threshold based on pattern count | |
| self.base_threshold = 0.3 + (0.4 * min(len(self.patterns) / 200, 1.0)) | |
| # Initialize Semantic Engine | |
| print("[INDEX] Initializing Semantic Manifold...") | |
| self.embedder = SemanticEmbedder() | |
| self.pattern_vectors = {} | |
| self._reindex_vectors() | |
| confirmed = sum(1 for h in self.hits.values() if self._total_magnitude(h) >= 2.0) | |
| unconfirmed = sum(1 for h in self.hits.values() if 0 < self._total_magnitude(h) < 1.0) | |
| print(f"[DISTILLER] Confirmed: {confirmed} | Unconfirmed: {unconfirmed} | Threshold: {self.base_threshold:.2f}") | |
| self.word_freq = self._calculate_word_freq() | |
| def _reindex_vectors(self): | |
| """Pre-calculates semantic embeddings for all known patterns.""" | |
| print(f"[INDEX]: Generating embeddings for {len(self.patterns)} patterns...") | |
| for label, p in self.patterns.items(): | |
| # Combine problem + solution for semantic context | |
| context = f"{p.get('problem', '')} {p.get('solution', '')} {label}" | |
| self.pattern_vectors[label] = self.embedder.embed_text(context) | |
| print(f"[INDEX]: ✅ Semantic manifold mapped ({len(self.pattern_vectors)} vectors).") | |
| def _calculate_word_freq(self): | |
| """Calculate inverse pattern frequency (IPF) for lean semantic weighting.""" | |
| freq = {} | |
| for p in self.patterns.values(): | |
| text = (p.get('problem','') + " " + p.get('solution','')).lower() | |
| words = set(re.findall(r'\w+', text)) | |
| for w in words: | |
| freq[w] = freq.get(w, 0) + 1 | |
| return freq | |
| def _get_word_weight(self, word, structural_weight): | |
| """Calculate semantic weight: rare words matter more.""" | |
| count = self.word_freq.get(word, 0) | |
| if count == 0: return structural_weight | |
| # Logarithmic scale for IPF: weight = 1 + log(total / count) | |
| import math | |
| ipf = 1.0 + math.log(len(self.patterns) / count) | |
| return structural_weight * ipf | |
| def _fuzzy_match(self, w1, w2): | |
| """Lightweight Jaccard similarity for fuzzy matching.""" | |
| if w1 == w2: return 1.0 | |
| if len(w1) < 4 or len(w2) < 4: return 0.0 | |
| s1, s2 = set(w1), set(w2) | |
| intersection = len(s1 & s2) | |
| union = len(s1 | s2) | |
| score = intersection / union | |
| return score if score > 0.7 else 0.0 | |
| def _load_hits(self): | |
| """Load magnitude-weighted hit data from disk.""" | |
| if os.path.exists(self.HIT_LOG_PATH): | |
| with open(self.HIT_LOG_PATH, 'r') as f: | |
| data = json.load(f) | |
| # Backward compat: convert flat counts to magnitude format | |
| for label, val in data.items(): | |
| if isinstance(val, (int, float)): | |
| data[label] = {"count": int(val), "magnitude": float(val) * 0.5, "layers": []} | |
| return data | |
| return {} | |
| def _save_hits(self): | |
| """Persist hit data to disk.""" | |
| with open(self.HIT_LOG_PATH, 'w') as f: | |
| json.dump(self.hits, f, indent=2) | |
| def _total_magnitude(self, hit_data): | |
| """Get current magnitude with decay applied.""" | |
| if isinstance(hit_data, dict): | |
| raw_mag = hit_data.get('magnitude', 0) | |
| last_hit = hit_data.get('last_hit', 0) | |
| if last_hit > 0 and raw_mag > 0: | |
| elapsed = time.time() - last_hit | |
| # Halve every DECAY_HALF_LIFE seconds | |
| decay_factor = 0.5 ** (elapsed / self.DECAY_HALF_LIFE) | |
| return raw_mag * decay_factor | |
| return raw_mag | |
| return float(hit_data) * 0.5 # backward compat | |
| def _classify_relevance(self, relevance): | |
| """Classify match into magnitude layer based on relevance score.""" | |
| if relevance >= 0.7: | |
| return "conceptual", self.MAGNITUDE_LAYERS["conceptual"] | |
| elif relevance >= 0.4: | |
| return "structural", self.MAGNITUDE_LAYERS["structural"] | |
| else: | |
| return "surface", self.MAGNITUDE_LAYERS["surface"] | |
| def _record_hit(self, label, relevance): | |
| """Record a hit. Re-mention restores magnitude to peak.""" | |
| layer_name, magnitude = self._classify_relevance(relevance) | |
| if label not in self.hits: | |
| self.hits[label] = {"count": 0, "magnitude": 0.0, "peak": 0.0, "layers": [], "last_hit": 0} | |
| h = self.hits[label] | |
| h["count"] += 1 | |
| h["last_hit"] = time.time() | |
| # Restore to peak first (re-mention recovery), then add new magnitude | |
| current_peak = h.get("peak", h["magnitude"]) | |
| h["magnitude"] = current_peak + magnitude | |
| h["peak"] = h["magnitude"] # new peak | |
| # Track which layers have been hit | |
| if layer_name not in h["layers"]: | |
| h["layers"].append(layer_name) | |
| def get_status(self, label): | |
| """Get distillation status based on decayed magnitude.""" | |
| hit_data = self.hits.get(label, {}) | |
| mag = self._total_magnitude(hit_data) # applies decay | |
| layers = hit_data.get('layers', []) if isinstance(hit_data, dict) else [] | |
| if mag == 0: | |
| return "NEW" | |
| elif mag < 1.0: | |
| return "UNCONFIRMED" # surface-only = might be noise | |
| elif mag < 2.0: | |
| return "PLAUSIBLE" | |
| elif len(layers) >= 2: | |
| return "DEEP_LOGIC" # hit at multiple layers = real | |
| else: | |
| return "CONFIRMED" # high magnitude single layer | |
| def add_note(self, text, domain="NOTE", forced_label=None): | |
| """Add a new pattern from freeform text. Self-organizing entry point.""" | |
| if forced_label: | |
| label = forced_label | |
| else: | |
| # Auto-generate label from text | |
| words = re.sub(r'[^a-zA-Z0-9\s]', '', text).upper().split() | |
| # Take first 4 meaningful words for label | |
| label_words = [w for w in words if len(w) > 2][:4] | |
| label = "_".join(label_words) if label_words else "NOTE_" + str(int(time.time())) | |
| # Don't overwrite existing patterns unless forced | |
| if label in self.patterns and not forced_label: | |
| label = label + "_" + str(int(time.time()) % 10000) | |
| self.patterns[label] = { | |
| "problem": text, | |
| "solution": text, | |
| "domain": domain, | |
| "confidence": 0.5, # starts neutral | |
| "source": "notepad", | |
| "type": "NOTE", | |
| "created": time.time(), | |
| } | |
| # Initial hit at conceptual layer (you wrote it = you meant it) | |
| self._record_hit(label, 1.0) | |
| self._save_hits() | |
| # Update threshold for new pattern count | |
| self.base_threshold = 0.3 + (0.4 * min(len(self.patterns) / 200, 1.0)) | |
| return label | |
| def load_lattice_patterns(self): | |
| """Load existing 5 patterns from lattice.""" | |
| for label, data in LATTICE_PATTERNS.items(): | |
| self.patterns[label] = { | |
| **data, | |
| "source": "lattice", | |
| "type": "CODE_PATTERN" | |
| } | |
| def load_conversation_patterns(self): | |
| """Load 11 patterns from this conversation.""" | |
| for label, data in CONVERSATION_PATTERNS.items(): | |
| self.patterns[label] = { | |
| **data, | |
| "source": "conversation_0938ac6c", | |
| "type": "INSIGHT" | |
| } | |
| def search(self, query, threshold=None, record=True): | |
| """ | |
| Adaptive distillation search. | |
| - Matches patterns using phrase + word relevance | |
| - Integrates 384-dim semantic similarity from manifolds | |
| - Records hits for matched patterns | |
| """ | |
| if threshold is None: | |
| threshold = self.base_threshold | |
| results = [] | |
| query_lower = query.lower() | |
| # 1. Generate Query Vector | |
| query_vector = self.embedder.embed_text(query) | |
| # 2. Hard matching patterns | |
| STRUCTURAL_WORDS = { 'a', 'an', 'the', 'is', 'it', 'in', 'on', 'at', 'to', 'of', 'and', 'or', 'but' } | |
| query_words = [(w, self._get_word_weight(w, 0.3 if w in STRUCTURAL_WORDS else 1.0)) for w in query_lower.split()] | |
| links = re.findall(r'\[\[(\w+)\]\]', query_lower) | |
| for label, pattern in self.patterns.items(): | |
| problem = pattern.get('problem', '').lower() | |
| solution = pattern.get('solution', '').lower() | |
| label_text = label.lower() | |
| relevance = 0 | |
| # Semantic Boost (Manifold Pathfinding) | |
| pattern_vector = self.pattern_vectors.get(label) | |
| semantic_score = 0 # Initialize semantic_score | |
| if pattern_vector: | |
| semantic_score = self.embedder.cosine_similarity(query_vector, pattern_vector) | |
| # Apply high weight to semantic resonance (The "LOVE" Anchor) | |
| relevance += (semantic_score * 0.8) | |
| # Exact phrase match (The 0x52 Anchor) | |
| if query_lower in problem: relevance += 0.4 | |
| if query_lower in solution: relevance += 0.3 | |
| if query_lower in label_text: relevance += 0.5 | |
| # Link boost | |
| if label.lower() in links: relevance += 2.0 | |
| # Combine logic | |
| if relevance >= threshold: | |
| status = self.get_status(label) | |
| # Record magnitude-weighted hit | |
| if record: | |
| self._record_hit(label, relevance) | |
| hit_data = self.hits.get(label, {}) | |
| results.append({ | |
| "label": label, | |
| "relevance": relevance, | |
| "confidence": pattern.get('confidence', 0.5), | |
| "status": status, | |
| "hits": hit_data.get('count', 0) if isinstance(hit_data, dict) else 0, | |
| "magnitude": self._total_magnitude(hit_data), | |
| "layers": hit_data.get('layers', []) if isinstance(hit_data, dict) else [], | |
| **pattern | |
| }) | |
| # Sort by: confirmed first, then relevance, then confidence | |
| status_order = {"DEEP_LOGIC": 4, "CONFIRMED": 3, "PLAUSIBLE": 2, "UNCONFIRMED": 1, "NEW": 0} | |
| results.sort(key=lambda x: ( | |
| status_order.get(x.get('status', 'NEW'), 0), | |
| x['relevance'], | |
| x['confidence'] | |
| ), reverse=True) | |
| # Save hits after search | |
| if record: | |
| self._save_hits() | |
| return results | |
| def distillation_report(self): | |
| """Report on pattern distillation with magnitude layers.""" | |
| deep_logic = [] | |
| confirmed = [] | |
| plausible = [] | |
| unconfirmed = [] | |
| new_patterns = [] | |
| for label in self.patterns: | |
| status = self.get_status(label) | |
| hit_data = self.hits.get(label, {}) | |
| mag = self._total_magnitude(hit_data) | |
| layers = hit_data.get('layers', []) if isinstance(hit_data, dict) else [] | |
| entry = (label, mag, layers) | |
| if status == "DEEP_LOGIC": | |
| deep_logic.append(entry) | |
| elif status == "CONFIRMED": | |
| confirmed.append(entry) | |
| elif status == "PLAUSIBLE": | |
| plausible.append(entry) | |
| elif status == "UNCONFIRMED": | |
| unconfirmed.append(entry) | |
| else: | |
| new_patterns.append(entry) | |
| print(f"\n{'='*60}") | |
| print(f"DISTILLATION REPORT (Magnitude Layers)") | |
| print(f"{'='*60}") | |
| print(f"Total patterns: {len(self.patterns)}") | |
| print(f" DEEP_LOGIC (multi-layer): {len(deep_logic)} = verified across layers") | |
| print(f" CONFIRMED (mag >= 2.0): {len(confirmed)} = strong signal") | |
| print(f" PLAUSIBLE (mag 1.0-2.0): {len(plausible)} = growing") | |
| print(f" UNCONFIRMED (mag < 1.0): {len(unconfirmed)} = potential noise") | |
| print(f" NEW (untested): {len(new_patterns)}") | |
| print(f"\nAdaptive threshold: {self.base_threshold:.2f}") | |
| if deep_logic: | |
| print(f"\nDEEP LOGIC (multi-layer verified):") | |
| for label, mag, layers in sorted(deep_logic, key=lambda x: x[1], reverse=True): | |
| print(f" [mag:{mag:.1f}] [{'+'.join(layers)}] {label}") | |
| if confirmed: | |
| print(f"\nCONFIRMED (strong signal):") | |
| for label, mag, layers in sorted(confirmed, key=lambda x: x[1], reverse=True): | |
| print(f" [mag:{mag:.1f}] [{'+'.join(layers)}] {label}") | |
| if unconfirmed: | |
| print(f"\nUNCONFIRMED (potential noise):") | |
| for label, mag, layers in unconfirmed: | |
| print(f" [mag:{mag:.1f}] [{'+'.join(layers)}] {label}") | |
| return { | |
| "confirmed": len(confirmed), | |
| "plausible": len(plausible), | |
| "unconfirmed": len(unconfirmed), | |
| "new": len(new_patterns), | |
| "threshold": self.base_threshold | |
| } | |
| def save_to_json(self, path): | |
| """Persist to JSON for inspection.""" | |
| with open(path, 'w') as f: | |
| json.dump({ | |
| "total_patterns": len(self.patterns), | |
| "sources": { | |
| "lattice": len(LATTICE_PATTERNS), | |
| "conversation": len(CONVERSATION_PATTERNS) | |
| }, | |
| "patterns": self.patterns | |
| }, f, indent=2) | |
| print(f"\n💾 Saved index to: {path}") | |
| def stats(self): | |
| """Print statistics.""" | |
| print(f"\n{'='*60}") | |
| print(f"IN-MEMORY PATTERN INDEX") | |
| print(f"{'='*60}") | |
| print(f"Total patterns: {len(self.patterns)}") | |
| print(f" From lattice: {len(LATTICE_PATTERNS)}") | |
| print(f" From conversation: {len(CONVERSATION_PATTERNS)}") | |
| print(f"Average confidence: {sum(p.get('confidence', 0.5) for p in self.patterns.values()) / len(self.patterns):.0%}") | |
| # Domain breakdown | |
| domains = {} | |
| for p in self.patterns.values(): | |
| d = p.get('domain', 'UNKNOWN') | |
| domains[d] = domains.get(d, 0) + 1 | |
| print(f"\nDomains:") | |
| for domain, count in sorted(domains.items(), key=lambda x: x[1], reverse=True): | |
| print(f" {domain}: {count}") | |
| if __name__ == "__main__": | |
| index = InMemoryIndex() | |
| index.stats() | |
| # Save to JSON | |
| save_path = os.path.join(index.LATTICE_DB_DIR, "in_memory_index.json") | |
| index.save_to_json(save_path) | |
| # Test search | |
| print(f"\n{'='*60}") | |
| print(f"TEST SEARCHES") | |
| print(f"{'='*60}\n") | |
| for query in ["singleton", "react", "lattice", "honest"]: | |
| results = index.search(query) | |
| print(f"Query: '{query}' → {len(results)} results") | |
| if results: | |
| print(f" Top: {results[0]['label']} ({results[0]['confidence']:.0%})") | |
| print() | |