""" IN-MEMORY PATTERN INDEX Fast lookup without HDD writes - merge existing + conversation + Gemini chat patterns """ import sys import os import json import time import re try: from System.semantic_embedder import SemanticEmbedder except ImportError: try: from semantic_embedder import SemanticEmbedder except ImportError: # Final fallback for scripts in Shop/ sys.path.append(os.path.dirname(os.path.abspath(__file__))) from semantic_embedder import SemanticEmbedder # Existing 5 lattice patterns LATTICE_PATTERNS = { "PATTERN_SINGLETON_DATABASE": { "lba": 8534859776, "domain": "SOFTWARE_ARCHITECTURE", "problem": "Need to ensure only one database connection exists", "solution": "Singleton pattern with thread-safe initialization", "reusability": 9, "confidence": 0.82 }, "PATTERN_REACT_HOOKS_DEPS": { "lba": 3371401216, "domain": "WEB_DEVELOPMENT", "problem": "React component not re-rendering when props change", "solution": "Add dependency array to useEffect", "reusability": 10, "confidence": 0.85 } } CONVERSATION_PATTERNS = { "AGENT_IS_LATTICE": { "domain": "CONCEPTUAL", "problem": "Separation between agent and data structure", "solution": "Agent is non-orientable surface - no inside/outside separation", "confidence": 0.95 } } class InMemoryIndex: """ Adaptive Distillation Index. Tracks pattern hit counts to distinguish signal from noise: - Once-patterns (1 hit) = UNCONFIRMED (might be noise) - Twice-patterns (2 hits) = PLAUSIBLE - Multi-patterns (3+ hits) = CONFIRMED (logic) The lattice self-cleans through use. Signal persists, noise decays. """ # Hit tracking file handled dynamically in __init__ HIT_LOG_PATH = None # Magnitude layers: logic exists in layers # Layer 0: Surface (keyword substring match) = low magnitude # Layer 1: Structural (multi-word + domain match) = medium magnitude # Layer 2: Conceptual (phrase match in problem/solution) = high magnitude # Decay: magnitude halves every DECAY_HALF_LIFE seconds without a hit DECAY_HALF_LIFE = 86400 # 24 hours MAGNITUDE_LAYERS = { "surface": 0.3, # keyword substring match (low relevance) "structural": 0.6, # multi-word + domain match (medium) "conceptual": 1.0, # full phrase match in problem/solution (high) } def __init__(self): # Handle relative pathing for portability BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) self.LATTICE_DB_DIR = os.path.join(BASE_DIR, "Lattice_DB") self.HIT_LOG_PATH = os.path.join(self.LATTICE_DB_DIR, "pattern_hits.json") index_path = os.path.join(self.LATTICE_DB_DIR, "dual_anchor_index.json") if os.path.exists(index_path): with open(index_path, 'r') as f: data = json.load(f) self.patterns = data.get('patterns', {}) sources = data.get('sources', {}) print(f"[INDEX] Loaded {len(self.patterns)} dual-anchor patterns") else: # Fallback to original patterns self.patterns = {} self.load_lattice_patterns() self.load_conversation_patterns() print("[INDEX] Dual-anchor index not found, using original 16 patterns") # Load hit tracking (magnitude-weighted) self.hits = self._load_hits() # Calculate adaptive threshold based on pattern count self.base_threshold = 0.3 + (0.4 * min(len(self.patterns) / 200, 1.0)) # Initialize Semantic Engine print("[INDEX] Initializing Semantic Manifold...") self.embedder = SemanticEmbedder() self.pattern_vectors = {} self._reindex_vectors() confirmed = sum(1 for h in self.hits.values() if self._total_magnitude(h) >= 2.0) unconfirmed = sum(1 for h in self.hits.values() if 0 < self._total_magnitude(h) < 1.0) print(f"[DISTILLER] Confirmed: {confirmed} | Unconfirmed: {unconfirmed} | Threshold: {self.base_threshold:.2f}") self.word_freq = self._calculate_word_freq() def _reindex_vectors(self): """Pre-calculates semantic embeddings for all known patterns.""" print(f"[INDEX]: Generating embeddings for {len(self.patterns)} patterns...") for label, p in self.patterns.items(): # Combine problem + solution for semantic context context = f"{p.get('problem', '')} {p.get('solution', '')} {label}" self.pattern_vectors[label] = self.embedder.embed_text(context) print(f"[INDEX]: āœ… Semantic manifold mapped ({len(self.pattern_vectors)} vectors).") def _calculate_word_freq(self): """Calculate inverse pattern frequency (IPF) for lean semantic weighting.""" freq = {} for p in self.patterns.values(): text = (p.get('problem','') + " " + p.get('solution','')).lower() words = set(re.findall(r'\w+', text)) for w in words: freq[w] = freq.get(w, 0) + 1 return freq def _get_word_weight(self, word, structural_weight): """Calculate semantic weight: rare words matter more.""" count = self.word_freq.get(word, 0) if count == 0: return structural_weight # Logarithmic scale for IPF: weight = 1 + log(total / count) import math ipf = 1.0 + math.log(len(self.patterns) / count) return structural_weight * ipf def _fuzzy_match(self, w1, w2): """Lightweight Jaccard similarity for fuzzy matching.""" if w1 == w2: return 1.0 if len(w1) < 4 or len(w2) < 4: return 0.0 s1, s2 = set(w1), set(w2) intersection = len(s1 & s2) union = len(s1 | s2) score = intersection / union return score if score > 0.7 else 0.0 def _load_hits(self): """Load magnitude-weighted hit data from disk.""" if os.path.exists(self.HIT_LOG_PATH): with open(self.HIT_LOG_PATH, 'r') as f: data = json.load(f) # Backward compat: convert flat counts to magnitude format for label, val in data.items(): if isinstance(val, (int, float)): data[label] = {"count": int(val), "magnitude": float(val) * 0.5, "layers": []} return data return {} def _save_hits(self): """Persist hit data to disk.""" with open(self.HIT_LOG_PATH, 'w') as f: json.dump(self.hits, f, indent=2) def _total_magnitude(self, hit_data): """Get current magnitude with decay applied.""" if isinstance(hit_data, dict): raw_mag = hit_data.get('magnitude', 0) last_hit = hit_data.get('last_hit', 0) if last_hit > 0 and raw_mag > 0: elapsed = time.time() - last_hit # Halve every DECAY_HALF_LIFE seconds decay_factor = 0.5 ** (elapsed / self.DECAY_HALF_LIFE) return raw_mag * decay_factor return raw_mag return float(hit_data) * 0.5 # backward compat def _classify_relevance(self, relevance): """Classify match into magnitude layer based on relevance score.""" if relevance >= 0.7: return "conceptual", self.MAGNITUDE_LAYERS["conceptual"] elif relevance >= 0.4: return "structural", self.MAGNITUDE_LAYERS["structural"] else: return "surface", self.MAGNITUDE_LAYERS["surface"] def _record_hit(self, label, relevance): """Record a hit. Re-mention restores magnitude to peak.""" layer_name, magnitude = self._classify_relevance(relevance) if label not in self.hits: self.hits[label] = {"count": 0, "magnitude": 0.0, "peak": 0.0, "layers": [], "last_hit": 0} h = self.hits[label] h["count"] += 1 h["last_hit"] = time.time() # Restore to peak first (re-mention recovery), then add new magnitude current_peak = h.get("peak", h["magnitude"]) h["magnitude"] = current_peak + magnitude h["peak"] = h["magnitude"] # new peak # Track which layers have been hit if layer_name not in h["layers"]: h["layers"].append(layer_name) def get_status(self, label): """Get distillation status based on decayed magnitude.""" hit_data = self.hits.get(label, {}) mag = self._total_magnitude(hit_data) # applies decay layers = hit_data.get('layers', []) if isinstance(hit_data, dict) else [] if mag == 0: return "NEW" elif mag < 1.0: return "UNCONFIRMED" # surface-only = might be noise elif mag < 2.0: return "PLAUSIBLE" elif len(layers) >= 2: return "DEEP_LOGIC" # hit at multiple layers = real else: return "CONFIRMED" # high magnitude single layer def add_note(self, text, domain="NOTE", forced_label=None): """Add a new pattern from freeform text. Self-organizing entry point.""" if forced_label: label = forced_label else: # Auto-generate label from text words = re.sub(r'[^a-zA-Z0-9\s]', '', text).upper().split() # Take first 4 meaningful words for label label_words = [w for w in words if len(w) > 2][:4] label = "_".join(label_words) if label_words else "NOTE_" + str(int(time.time())) # Don't overwrite existing patterns unless forced if label in self.patterns and not forced_label: label = label + "_" + str(int(time.time()) % 10000) self.patterns[label] = { "problem": text, "solution": text, "domain": domain, "confidence": 0.5, # starts neutral "source": "notepad", "type": "NOTE", "created": time.time(), } # Initial hit at conceptual layer (you wrote it = you meant it) self._record_hit(label, 1.0) self._save_hits() # Update threshold for new pattern count self.base_threshold = 0.3 + (0.4 * min(len(self.patterns) / 200, 1.0)) return label def load_lattice_patterns(self): """Load existing 5 patterns from lattice.""" for label, data in LATTICE_PATTERNS.items(): self.patterns[label] = { **data, "source": "lattice", "type": "CODE_PATTERN" } def load_conversation_patterns(self): """Load 11 patterns from this conversation.""" for label, data in CONVERSATION_PATTERNS.items(): self.patterns[label] = { **data, "source": "conversation_0938ac6c", "type": "INSIGHT" } def search(self, query, threshold=None, record=True): """ Adaptive distillation search. - Matches patterns using phrase + word relevance - Integrates 384-dim semantic similarity from manifolds - Records hits for matched patterns """ if threshold is None: threshold = self.base_threshold results = [] query_lower = query.lower() # 1. Generate Query Vector query_vector = self.embedder.embed_text(query) # 2. Hard matching patterns STRUCTURAL_WORDS = { 'a', 'an', 'the', 'is', 'it', 'in', 'on', 'at', 'to', 'of', 'and', 'or', 'but' } query_words = [(w, self._get_word_weight(w, 0.3 if w in STRUCTURAL_WORDS else 1.0)) for w in query_lower.split()] links = re.findall(r'\[\[(\w+)\]\]', query_lower) for label, pattern in self.patterns.items(): problem = pattern.get('problem', '').lower() solution = pattern.get('solution', '').lower() label_text = label.lower() relevance = 0 # Semantic Boost (Manifold Pathfinding) pattern_vector = self.pattern_vectors.get(label) semantic_score = 0 # Initialize semantic_score if pattern_vector: semantic_score = self.embedder.cosine_similarity(query_vector, pattern_vector) # Apply high weight to semantic resonance (The "LOVE" Anchor) relevance += (semantic_score * 0.8) # Exact phrase match (The 0x52 Anchor) if query_lower in problem: relevance += 0.4 if query_lower in solution: relevance += 0.3 if query_lower in label_text: relevance += 0.5 # Link boost if label.lower() in links: relevance += 2.0 # Combine logic if relevance >= threshold: status = self.get_status(label) # Record magnitude-weighted hit if record: self._record_hit(label, relevance) hit_data = self.hits.get(label, {}) results.append({ "label": label, "relevance": relevance, "confidence": pattern.get('confidence', 0.5), "status": status, "hits": hit_data.get('count', 0) if isinstance(hit_data, dict) else 0, "magnitude": self._total_magnitude(hit_data), "layers": hit_data.get('layers', []) if isinstance(hit_data, dict) else [], **pattern }) # Sort by: confirmed first, then relevance, then confidence status_order = {"DEEP_LOGIC": 4, "CONFIRMED": 3, "PLAUSIBLE": 2, "UNCONFIRMED": 1, "NEW": 0} results.sort(key=lambda x: ( status_order.get(x.get('status', 'NEW'), 0), x['relevance'], x['confidence'] ), reverse=True) # Save hits after search if record: self._save_hits() return results def distillation_report(self): """Report on pattern distillation with magnitude layers.""" deep_logic = [] confirmed = [] plausible = [] unconfirmed = [] new_patterns = [] for label in self.patterns: status = self.get_status(label) hit_data = self.hits.get(label, {}) mag = self._total_magnitude(hit_data) layers = hit_data.get('layers', []) if isinstance(hit_data, dict) else [] entry = (label, mag, layers) if status == "DEEP_LOGIC": deep_logic.append(entry) elif status == "CONFIRMED": confirmed.append(entry) elif status == "PLAUSIBLE": plausible.append(entry) elif status == "UNCONFIRMED": unconfirmed.append(entry) else: new_patterns.append(entry) print(f"\n{'='*60}") print(f"DISTILLATION REPORT (Magnitude Layers)") print(f"{'='*60}") print(f"Total patterns: {len(self.patterns)}") print(f" DEEP_LOGIC (multi-layer): {len(deep_logic)} = verified across layers") print(f" CONFIRMED (mag >= 2.0): {len(confirmed)} = strong signal") print(f" PLAUSIBLE (mag 1.0-2.0): {len(plausible)} = growing") print(f" UNCONFIRMED (mag < 1.0): {len(unconfirmed)} = potential noise") print(f" NEW (untested): {len(new_patterns)}") print(f"\nAdaptive threshold: {self.base_threshold:.2f}") if deep_logic: print(f"\nDEEP LOGIC (multi-layer verified):") for label, mag, layers in sorted(deep_logic, key=lambda x: x[1], reverse=True): print(f" [mag:{mag:.1f}] [{'+'.join(layers)}] {label}") if confirmed: print(f"\nCONFIRMED (strong signal):") for label, mag, layers in sorted(confirmed, key=lambda x: x[1], reverse=True): print(f" [mag:{mag:.1f}] [{'+'.join(layers)}] {label}") if unconfirmed: print(f"\nUNCONFIRMED (potential noise):") for label, mag, layers in unconfirmed: print(f" [mag:{mag:.1f}] [{'+'.join(layers)}] {label}") return { "confirmed": len(confirmed), "plausible": len(plausible), "unconfirmed": len(unconfirmed), "new": len(new_patterns), "threshold": self.base_threshold } def save_to_json(self, path): """Persist to JSON for inspection.""" with open(path, 'w') as f: json.dump({ "total_patterns": len(self.patterns), "sources": { "lattice": len(LATTICE_PATTERNS), "conversation": len(CONVERSATION_PATTERNS) }, "patterns": self.patterns }, f, indent=2) print(f"\nšŸ’¾ Saved index to: {path}") def stats(self): """Print statistics.""" print(f"\n{'='*60}") print(f"IN-MEMORY PATTERN INDEX") print(f"{'='*60}") print(f"Total patterns: {len(self.patterns)}") print(f" From lattice: {len(LATTICE_PATTERNS)}") print(f" From conversation: {len(CONVERSATION_PATTERNS)}") print(f"Average confidence: {sum(p.get('confidence', 0.5) for p in self.patterns.values()) / len(self.patterns):.0%}") # Domain breakdown domains = {} for p in self.patterns.values(): d = p.get('domain', 'UNKNOWN') domains[d] = domains.get(d, 0) + 1 print(f"\nDomains:") for domain, count in sorted(domains.items(), key=lambda x: x[1], reverse=True): print(f" {domain}: {count}") if __name__ == "__main__": index = InMemoryIndex() index.stats() # Save to JSON save_path = os.path.join(index.LATTICE_DB_DIR, "in_memory_index.json") index.save_to_json(save_path) # Test search print(f"\n{'='*60}") print(f"TEST SEARCHES") print(f"{'='*60}\n") for query in ["singleton", "react", "lattice", "honest"]: results = index.search(query) print(f"Query: '{query}' → {len(results)} results") if results: print(f" Top: {results[0]['label']} ({results[0]['confidence']:.0%})") print()