#!/usr/bin/env python3 """ Mnemo v4: SLM-Inspired Architecture ==================================== Implements key SLM architecture features with parameter adjustments based on Mnemo benchmark findings. SLM Features Implemented: 1. Three-Tiered Memory (Working → Token → Semantic) 2. Promotion/Demotion Algorithms 3. Neural Link Types (8 types with decay) 4. Self-Tuning Parameters 5. Memory Utility Predictor (NEW - from benchmarks) Key Parameter Adjustments (from benchmarks): - Semantic threshold: 0.65 → 0.50 (SLM was too high) - Quality acceptance: 0.30 → 0.50 (SLM too permissive) - Promotion threshold: 0.65 → 0.55 (faster promotion) - Link pruning: 60 days → 30 days (faster cleanup) """ import hashlib import time import re import threading import numpy as np from typing import Dict, List, Optional, Tuple, Any, Set from dataclasses import dataclass, field from collections import defaultdict from enum import Enum import json # Optional imports try: import faiss HAS_FAISS = True except ImportError: HAS_FAISS = False try: import networkx as nx HAS_NETWORKX = True except ImportError: HAS_NETWORKX = False try: from rank_bm25 import BM25Okapi HAS_BM25 = True except ImportError: HAS_BM25 = False # ============================================================================= # ENUMS AND CONSTANTS (from SLM spec) # ============================================================================= class MemoryTier(Enum): """Three-tiered memory hierarchy from SLM""" WORKING = "working" # 32MB, <1ms, current context TOKEN = "token" # 100-250 items, 1-10ms, compressed SEMANTIC = "semantic" # Persistent, 10-100ms, full knowledge class LinkType(Enum): """Eight link types from SLM Neural Link system""" DIRECT_REFERENCE = "direct_reference" # Explicit reference SEMANTIC_SIMILARITY = "semantic_similarity" # Vector similarity CO_OCCURRENCE = "co_occurrence" # Appear together HIERARCHICAL = "hierarchical" # Parent-child TEMPORAL = "temporal" # Time-based CAUSAL = "causal" # Cause-effect CROSS_DOMAIN = "cross_domain" # Different domains ASSOCIATIVE = "associative" # General association # SLM Link Type Properties (adjusted based on benchmarks) LINK_PROPERTIES = { LinkType.DIRECT_REFERENCE: { "creation_threshold": 0.85, # SLM: 0.90 "initial_strength": 0.90, "decay_rate": 0.005, # per day "usage_boost": 0.05 }, LinkType.SEMANTIC_SIMILARITY: { "creation_threshold": 0.50, # SLM: 0.65, ADJUSTED from benchmarks "initial_strength": 0.75, "decay_rate": 0.01, "usage_boost": 0.03 }, LinkType.CO_OCCURRENCE: { "creation_threshold": 0.60, "initial_strength": 0.70, "decay_rate": 0.015, "usage_boost": 0.04 }, LinkType.HIERARCHICAL: { "creation_threshold": 0.80, # SLM: 0.85 "initial_strength": 0.85, "decay_rate": 0.003, "usage_boost": 0.02 }, LinkType.TEMPORAL: { "creation_threshold": 0.55, "initial_strength": 0.65, "decay_rate": 0.02, "usage_boost": 0.05 }, LinkType.CAUSAL: { "creation_threshold": 0.75, "initial_strength": 0.80, "decay_rate": 0.005, "usage_boost": 0.03 }, LinkType.CROSS_DOMAIN: { "creation_threshold": 0.70, # SLM: 0.80 "initial_strength": 0.65, # SLM: 0.70 "decay_rate": 0.008, "usage_boost": 0.04 }, LinkType.ASSOCIATIVE: { "creation_threshold": 0.45, # Permissive for exploration "initial_strength": 0.60, "decay_rate": 0.025, "usage_boost": 0.06 } } # ============================================================================= # DATA CLASSES # ============================================================================= @dataclass class Memory: """Memory unit with SLM-style metadata""" id: str content: str embedding: np.ndarray tier: MemoryTier = MemoryTier.SEMANTIC namespace: str = "default" # Quality and relevance (SLM quality gates) quality_score: float = 0.5 relevance_score: float = 0.5 confidence: float = 0.5 # Access tracking (for promotion/demotion) access_count: int = 0 last_accessed: float = field(default_factory=time.time) created_at: float = field(default_factory=time.time) # SLM priority decay priority: float = 1.0 metadata: Dict = field(default_factory=dict) @dataclass class NeuralLink: """SLM Neural Link between memories""" source_id: str target_id: str link_type: LinkType strength: float created_at: float = field(default_factory=time.time) last_traversed: float = field(default_factory=time.time) traversal_count: int = 0 @dataclass class SearchResult: """Search result with multi-strategy scores""" id: str content: str score: float tier: MemoryTier = MemoryTier.SEMANTIC link_path: List[str] = field(default_factory=list) strategy_scores: Dict[str, float] = field(default_factory=dict) metadata: Dict = field(default_factory=dict) # ============================================================================= # MEMORY UTILITY PREDICTOR (NEW - from Mnemo benchmarks) # ============================================================================= class MemoryUtilityPredictor: """ Predicts whether memory injection will help or hurt. Key finding from benchmarks: - Within-conversation: Memory often HURTS (-3 to -12 pts) - Cross-session: Memory HELPS (+2 pts on dependent questions) """ # Signals that indicate memory should be used INJECTION_SIGNALS = [ "previous", "earlier", "before", "you said", "you mentioned", "as you", "based on", "using your", "your analysis", "your framework", "we discussed", "we analyzed", "refer to", "from your", "compare", "contrast", "synthesize", "combine", "integrate", "apply your", "using your", "based on your", "you previously", "your earlier", "you have analyzed" ] # Signals that indicate memory should NOT be used SKIP_SIGNALS = [ "this is a new", "new topic", "different subject", "what is", "define", "explain what" ] def __init__(self): self.stats = { "predictions": 0, "inject_recommended": 0, "skip_recommended": 0, "skip_context_window": 0 } def should_inject(self, query: str, context: str = "", conversation_history: str = "", model_confidence: float = 0.5) -> Tuple[bool, str, float]: """ Predict if memory injection will help. Returns: (should_inject, reason, confidence) """ self.stats["predictions"] += 1 combined = (query + " " + context).lower() # Check skip signals first for signal in self.SKIP_SIGNALS: if signal in combined: self.stats["skip_recommended"] += 1 return False, f"skip_signal:{signal}", 0.8 # Check injection signals for signal in self.INJECTION_SIGNALS: if signal in combined: # But check if context window already has info if self._context_has_info(query, conversation_history): self.stats["skip_context_window"] += 1 return False, "context_window_sufficient", 0.7 self.stats["inject_recommended"] += 1 return True, f"inject_signal:{signal}", 0.85 # No clear signal - default to skip for simple queries if self._is_simple_query(query): self.stats["skip_recommended"] += 1 return False, "simple_query", 0.6 # Model is very confident - skip memory if model_confidence > 0.85: self.stats["skip_recommended"] += 1 return False, "model_confident", 0.7 # Default: don't inject (memory often hurts) self.stats["skip_recommended"] += 1 return False, "no_signal", 0.5 def _context_has_info(self, query: str, history: str) -> bool: """Check if conversation history already has needed context""" if not history or len(history.split()) < 200: return False query_keywords = set(query.lower().split()) - { "the", "a", "is", "are", "to", "of", "in", "for", "what", "how" } history_lower = history.lower() overlap = sum(1 for kw in query_keywords if kw in history_lower) return overlap >= len(query_keywords) * 0.6 def _is_simple_query(self, query: str) -> bool: """Detect simple factual queries that don't need memory""" simple_patterns = [ r"^what is\b", r"^who is\b", r"^when did\b", r"^where is\b", r"^how many\b", r"^define\b" ] query_lower = query.lower() return any(re.search(p, query_lower) for p in simple_patterns) # ============================================================================= # SELF-TUNING SYSTEM (from SLM) # ============================================================================= class SelfTuner: """ SLM Self-Tuning Parameter System Tracks performance and auto-adjusts parameters. """ def __init__(self): self.parameters = { "similarity_threshold": 0.10, # ADJUSTED from SLM 0.65 "quality_threshold": 0.35, # ADJUSTED from SLM 0.30 "promotion_threshold": 0.55, # ADJUSTED from SLM 0.65 "demotion_threshold": 0.70, # ADJUSTED from SLM 0.75 } self.performance_history = defaultdict(list) self.adjustment_count = 0 # SLM learning rates self.learning_rates = { "similarity_threshold": 0.01, "quality_threshold": 0.02, "promotion_threshold": 0.05, } def record_outcome(self, param_name: str, value: float, success: bool): """Record outcome for a parameter setting""" self.performance_history[param_name].append({ "value": value, "success": success, "timestamp": time.time() }) # Keep last 100 outcomes if len(self.performance_history[param_name]) > 100: self.performance_history[param_name] = \ self.performance_history[param_name][-100:] def should_adjust(self, param_name: str) -> bool: """Check if parameter should be adjusted (every 10 samples)""" history = self.performance_history.get(param_name, []) return len(history) >= 10 and len(history) % 10 == 0 def get_adjustment(self, param_name: str) -> float: """Calculate parameter adjustment based on recent performance""" history = self.performance_history.get(param_name, []) if len(history) < 10: return 0.0 recent = history[-10:] success_rate = sum(1 for h in recent if h["success"]) / len(recent) lr = self.learning_rates.get(param_name, 0.01) if success_rate < 0.5: # Performance poor - try lower threshold return -lr elif success_rate > 0.8: # Performance good - can be more selective return lr * 0.5 return 0.0 def auto_tune(self): """Run auto-tuning cycle""" adjusted = [] for param_name in self.parameters: if self.should_adjust(param_name): adjustment = self.get_adjustment(param_name) if adjustment != 0: old_val = self.parameters[param_name] new_val = max(0.1, min(0.9, old_val + adjustment)) self.parameters[param_name] = new_val adjusted.append((param_name, old_val, new_val)) self.adjustment_count += 1 return adjusted # ============================================================================= # THREE-TIERED MEMORY MANAGER (from SLM) # ============================================================================= class TieredMemoryManager: """ SLM Three-Tiered Memory Hierarchy Working Memory (32MB, <1ms): - Currently active info - Priority decay: 0.95/minute - Eviction threshold: 0.2 Token Memory (100-250 items, 1-10ms): - Compressed representations - Loop-based organization - Merging at 0.8 similarity Semantic Memory (persistent, 10-100ms): - Full knowledge representations - Partition-based organization """ # SLM spec values (some adjusted based on benchmarks) WORKING_MEMORY_SIZE = 50 # items (simplified from 32MB) TOKEN_LOOP_CAPACITY = 100 # default TOKEN_LOOP_MAX = 250 # expandable PRIORITY_DECAY = 0.95 # per access cycle EVICTION_THRESHOLD = 0.2 LOOP_MERGE_THRESHOLD = 0.8 # Memory decay settings (gentle) MEMORY_DECAY_RATE = 0.01 # 1% quality decay per day for unused memories MEMORY_PRUNE_THRESHOLD = 0.15 # Prune memories below this quality MEMORY_STALE_DAYS = 30 # Consider memory stale after this many days unused def __init__(self, tuner: SelfTuner): self.tuner = tuner # Three tiers self.working_memory: Dict[str, Memory] = {} self.token_loops: Dict[str, List[str]] = defaultdict(list) # namespace -> ids self.semantic_memory: Dict[str, Memory] = {} self.stats = { "promotions": 0, "demotions": 0, "evictions": 0, "memories_decayed": 0, "memories_pruned": 0 } def add_to_tier(self, memory: Memory, tier: MemoryTier): """Add memory to specific tier""" memory.tier = tier if tier == MemoryTier.WORKING: self._add_to_working(memory) elif tier == MemoryTier.TOKEN: self._add_to_token(memory) else: self.semantic_memory[memory.id] = memory def _add_to_working(self, memory: Memory): """Add to working memory with eviction if needed""" if len(self.working_memory) >= self.WORKING_MEMORY_SIZE: self._evict_from_working() memory.priority = 1.0 self.working_memory[memory.id] = memory def _add_to_token(self, memory: Memory): """Add to token memory loop""" loop = self.token_loops[memory.namespace] if len(loop) >= self.TOKEN_LOOP_CAPACITY: # Demote oldest to semantic oldest_id = loop.pop(0) if oldest_id in self.semantic_memory: self.semantic_memory[oldest_id].tier = MemoryTier.SEMANTIC loop.append(memory.id) self.semantic_memory[memory.id] = memory # Store actual data in semantic memory.tier = MemoryTier.TOKEN def _evict_from_working(self): """Evict lowest priority items from working memory""" if not self.working_memory: return # Find lowest priority min_id = min(self.working_memory, key=lambda k: self.working_memory[k].priority) evicted = self.working_memory.pop(min_id) # Demote to token memory self._add_to_token(evicted) self.stats["evictions"] += 1 def decay_priorities(self): """Apply SLM priority decay (0.95 per cycle)""" for memory in self.working_memory.values(): memory.priority *= self.PRIORITY_DECAY # Evict if below threshold if memory.priority < self.EVICTION_THRESHOLD: self._evict_from_working() def calculate_promotion_score(self, memory: Memory, query_relevance: float) -> float: """ SLM Promotion Score: PromotionScore = (QueryRelevance * 0.6) + (AccessFrequency * 0.3) + (RecencyScore * 0.1) """ # Normalize access frequency (0-1) access_freq = min(memory.access_count / 10, 1.0) # Recency score (higher = more recent) age_hours = (time.time() - memory.last_accessed) / 3600 recency = max(0, 1 - (age_hours / 24)) # Decay over 24 hours return (query_relevance * 0.6) + (access_freq * 0.3) + (recency * 0.1) def calculate_demotion_score(self, memory: Memory, query_relevance: float) -> float: """ SLM Demotion Score: DemotionScore = (1-QueryRelevance)*0.5 + (1-AccessFrequency)*0.3 + (Age/MAX_AGE)*0.2 """ access_freq = min(memory.access_count / 10, 1.0) age_hours = (time.time() - memory.created_at) / 3600 age_score = min(age_hours / 168, 1.0) # MAX_AGE = 1 week return ((1 - query_relevance) * 0.5) + ((1 - access_freq) * 0.3) + (age_score * 0.2) def try_promote(self, memory_id: str, query_relevance: float) -> bool: """Try to promote memory to higher tier""" if memory_id not in self.semantic_memory: return False memory = self.semantic_memory[memory_id] score = self.calculate_promotion_score(memory, query_relevance) threshold = self.tuner.parameters["promotion_threshold"] if score > threshold: if memory.tier == MemoryTier.SEMANTIC: self._add_to_token(memory) self.stats["promotions"] += 1 return True elif memory.tier == MemoryTier.TOKEN: self._add_to_working(memory) self.stats["promotions"] += 1 return True return False def try_demote(self, memory_id: str, query_relevance: float) -> bool: """Try to demote memory to lower tier""" if memory_id in self.working_memory: memory = self.working_memory[memory_id] score = self.calculate_demotion_score(memory, query_relevance) threshold = self.tuner.parameters["demotion_threshold"] # Also check capacity (SLM: demote if >80% capacity) capacity_pressure = len(self.working_memory) / self.WORKING_MEMORY_SIZE if score > threshold and capacity_pressure > 0.8: self.working_memory.pop(memory_id) self._add_to_token(memory) self.stats["demotions"] += 1 return True return False def get_all_memories(self) -> Dict[str, Memory]: """Get all memories across tiers""" return {**self.semantic_memory, **self.working_memory} def decay_memories(self) -> int: """ Apply gentle quality decay to unused semantic memories. Memories that are accessed stay fresh; unused ones gradually decay. Returns number of memories affected. """ now = time.time() affected = 0 for memory in self.semantic_memory.values(): # Calculate days since last access days_unused = (now - memory.last_accessed) / 86400 # seconds per day if days_unused > 1: # Only decay if unused for >1 day # Gentle decay: quality *= (1 - decay_rate * days_unused) # Capped to prevent instant destruction decay_factor = min(days_unused * self.MEMORY_DECAY_RATE, 0.1) memory.quality_score *= (1 - decay_factor) affected += 1 return affected def prune_stale_memories(self) -> Tuple[int, List[str]]: """ Remove memories that have decayed below threshold. Returns (count_pruned, list_of_pruned_ids). """ now = time.time() to_prune = [] for mem_id, memory in self.semantic_memory.items(): days_unused = (now - memory.last_accessed) / 86400 # Prune if: quality too low AND unused for too long if (memory.quality_score < self.MEMORY_PRUNE_THRESHOLD and days_unused > self.MEMORY_STALE_DAYS): to_prune.append(mem_id) # Remove pruned memories pruned_ids = [] for mem_id in to_prune: del self.semantic_memory[mem_id] pruned_ids.append(mem_id) return len(pruned_ids), pruned_ids def refresh_memory(self, memory_id: str): """Mark a memory as freshly accessed (resets decay)""" if memory_id in self.semantic_memory: self.semantic_memory[memory_id].last_accessed = time.time() elif memory_id in self.working_memory: self.working_memory[memory_id].last_accessed = time.time() def get_tier_stats(self) -> Dict: """Get tier statistics""" return { "working_memory_count": len(self.working_memory), "working_memory_capacity": self.WORKING_MEMORY_SIZE, "token_loops": {ns: len(ids) for ns, ids in self.token_loops.items()}, "semantic_memory_count": len(self.semantic_memory), "promotions": self.stats["promotions"], "demotions": self.stats["demotions"], "evictions": self.stats["evictions"] } # ============================================================================= # NEURAL LINK MANAGER (from SLM) # ============================================================================= class NeuralLinkManager: """ SLM Neural Link Pathway System Creates and manages typed connections between memories. """ # SLM path finding limits (adjusted based on benchmarks) MAX_PATH_DEPTH = 4 # SLM: 4 standard, 6 exhaustive MIN_PATH_STRENGTH = 0.40 # SLM: 0.45 PATH_STRENGTH_DECAY = 0.9 # SLM: 0.9 per hop MAX_BRANCHING = 12 # SLM: 12 # Pruning (adjusted based on benchmarks) PRUNE_STRENGTH_THRESHOLD = 0.25 # SLM: 0.30 PRUNE_AGE_DAYS = 30 # SLM: 60, ADJUSTED def __init__(self): self.links: Dict[str, NeuralLink] = {} # link_id -> NeuralLink self.outgoing: Dict[str, Set[str]] = defaultdict(set) # source -> link_ids self.incoming: Dict[str, Set[str]] = defaultdict(set) # target -> link_ids self.stats = { "links_created": 0, "links_pruned": 0, "traversals": 0 } def _link_id(self, source: str, target: str, link_type: LinkType) -> str: """Generate link ID""" return f"{source}:{target}:{link_type.value}" def create_link(self, source_id: str, target_id: str, link_type: LinkType, similarity: float) -> Optional[str]: """ Create link if similarity exceeds type-specific threshold. SLM LinkScore = (VectorSimilarity * 0.6) + (CoOccurrence * 0.25) + (DomainRelatedness * 0.15) Simplified here to just similarity. """ props = LINK_PROPERTIES[link_type] if similarity < props["creation_threshold"]: return None link_id = self._link_id(source_id, target_id, link_type) if link_id in self.links: # Strengthen existing link self.links[link_id].strength = min( 1.0, self.links[link_id].strength + props["usage_boost"] ) return link_id # Create new link link = NeuralLink( source_id=source_id, target_id=target_id, link_type=link_type, strength=props["initial_strength"] ) self.links[link_id] = link self.outgoing[source_id].add(link_id) self.incoming[target_id].add(link_id) self.stats["links_created"] += 1 return link_id def traverse_link(self, link_id: str) -> Optional[NeuralLink]: """Traverse a link, strengthening it""" if link_id not in self.links: return None link = self.links[link_id] link.traversal_count += 1 link.last_traversed = time.time() # Strengthen on traversal (up to daily max) props = LINK_PROPERTIES[link.link_type] link.strength = min(1.0, link.strength + props["usage_boost"]) self.stats["traversals"] += 1 return link def find_paths(self, source_id: str, target_id: str, max_depth: int = None) -> List[List[str]]: """Find paths between memories (SLM path finding)""" max_depth = max_depth or self.MAX_PATH_DEPTH paths = [] def dfs(current: str, target: str, path: List[str], strength: float, depth: int): if depth > max_depth or strength < self.MIN_PATH_STRENGTH: return if current == target: paths.append(path.copy()) return # Limit branching link_ids = list(self.outgoing.get(current, set()))[:self.MAX_BRANCHING] for link_id in link_ids: link = self.links.get(link_id) if link and link.target_id not in path: new_strength = strength * link.strength * self.PATH_STRENGTH_DECAY path.append(link.target_id) dfs(link.target_id, target, path, new_strength, depth + 1) path.pop() dfs(source_id, target_id, [source_id], 1.0, 0) return paths def get_connected(self, memory_id: str, link_types: List[LinkType] = None) -> List[str]: """Get memories connected to this one""" connected = [] for link_id in self.outgoing.get(memory_id, set()): link = self.links.get(link_id) if link: if link_types is None or link.link_type in link_types: connected.append(link.target_id) return connected def decay_links(self): """Apply daily decay to all links""" for link in self.links.values(): props = LINK_PROPERTIES[link.link_type] link.strength *= (1 - props["decay_rate"]) def prune_weak_links(self) -> int: """Prune links below strength threshold and unused for too long""" to_prune = [] now = time.time() age_threshold = self.PRUNE_AGE_DAYS * 24 * 3600 for link_id, link in self.links.items(): age = now - link.last_traversed if link.strength < self.PRUNE_STRENGTH_THRESHOLD and age > age_threshold: to_prune.append(link_id) for link_id in to_prune: link = self.links.pop(link_id) self.outgoing[link.source_id].discard(link_id) self.incoming[link.target_id].discard(link_id) self.stats["links_pruned"] += 1 return len(to_prune) def remove_links_for_memory(self, memory_id: str) -> int: """Remove all links connected to a memory (when memory is pruned)""" to_remove = [] # Find all links involving this memory for link_id, link in self.links.items(): if link.source_id == memory_id or link.target_id == memory_id: to_remove.append(link_id) # Remove them for link_id in to_remove: link = self.links.pop(link_id) self.outgoing[link.source_id].discard(link_id) self.incoming[link.target_id].discard(link_id) self.stats["links_pruned"] += 1 # Clean up empty entries if memory_id in self.outgoing: del self.outgoing[memory_id] if memory_id in self.incoming: del self.incoming[memory_id] return len(to_remove) def get_stats(self) -> Dict: return { "total_links": len(self.links), "links_by_type": { lt.value: sum(1 for l in self.links.values() if l.link_type == lt) for lt in LinkType }, **self.stats } # ============================================================================= # MAIN MNEMO v4 CLASS # ============================================================================= class Mnemo: """ Mnemo v4: SLM-Inspired Memory System Implements: - Three-tiered memory hierarchy - Neural link pathways (8 types) - Self-tuning parameters - Memory utility prediction With parameter adjustments based on Mnemo benchmarks. """ STOP_WORDS = {"a", "an", "the", "is", "are", "was", "were", "be", "been", "to", "of", "in", "for", "on", "with", "at", "by", "from", "and", "but", "or", "not", "this", "that", "i", "me", "my"} def __init__(self, embedding_dim: int = 384): self.embedding_dim = embedding_dim # Core components self.tuner = SelfTuner() self.memory_manager = TieredMemoryManager(self.tuner) self.link_manager = NeuralLinkManager() self.utility_predictor = MemoryUtilityPredictor() # Vector index self._embeddings: List[np.ndarray] = [] self._ids: List[str] = [] if HAS_FAISS: self.index = faiss.IndexFlatIP(embedding_dim) else: self.index = None # BM25 self.bm25 = None self._tokenized_docs: List[List[str]] = [] # Knowledge Graph if HAS_NETWORKX: self.graph = nx.DiGraph() else: self.graph = None # Cache self._cache: Dict[str, Any] = {} self._cache_lock = threading.Lock() # Stats self.stats = { "adds": 0, "adds_rejected": 0, "searches": 0, "cache_hits": 0, "cache_misses": 0 } def _get_embedding(self, text: str) -> np.ndarray: """Generate embedding (hash-based for POC)""" cache_key = f"emb:{hashlib.md5(text.encode()).hexdigest()}" with self._cache_lock: if cache_key in self._cache: self.stats["cache_hits"] += 1 return self._cache[cache_key] self.stats["cache_misses"] += 1 # Hash-based embedding embedding = np.zeros(self.embedding_dim, dtype=np.float32) words = text.lower().split() for i, word in enumerate(words): idx = hash(word) % self.embedding_dim embedding[idx] += 1.0 / (i + 1) norm = np.linalg.norm(embedding) if norm > 0: embedding = embedding / norm with self._cache_lock: self._cache[cache_key] = embedding return embedding def _estimate_quality(self, content: str) -> float: """Estimate content quality (SLM quality gates)""" score = 0.5 words = len(content.split()) if words < 5: score -= 0.3 elif words > 20: score += 0.1 if any(r in content.lower() for r in ["because", "therefore", "shows"]): score += 0.2 if re.search(r'\d+', content): score += 0.1 if any(v in content.lower() for v in ["something", "stuff", "maybe"]): score -= 0.2 return max(0.0, min(1.0, score)) def should_inject(self, query: str, context: str = "", conversation_history: str = "", model_confidence: float = 0.5) -> bool: """ Memory Utility Predictor - should we inject memory? Based on benchmark findings that memory often hurts performance. """ should, reason, confidence = self.utility_predictor.should_inject( query, context, conversation_history, model_confidence ) return should def add(self, content: str, namespace: str = "default", metadata: Dict = None, skip_quality_check: bool = False) -> Optional[str]: """Add memory with SLM quality gates""" quality = self._estimate_quality(content) threshold = self.tuner.parameters["quality_threshold"] if not skip_quality_check and quality < threshold: self.stats["adds_rejected"] += 1 self.tuner.record_outcome("quality_threshold", threshold, False) return None memory_id = f"mem_{hashlib.md5(content.encode()).hexdigest()[:8]}" embedding = self._get_embedding(content) memory = Memory( id=memory_id, content=content, embedding=embedding, namespace=namespace, quality_score=quality, metadata=metadata or {} ) # Add to semantic memory (lowest tier) self.memory_manager.add_to_tier(memory, MemoryTier.SEMANTIC) # Update indices self._embeddings.append(embedding) self._ids.append(memory_id) if HAS_FAISS and self.index is not None: self.index.add(embedding.reshape(1, -1)) tokens = content.lower().split() self._tokenized_docs.append(tokens) if HAS_BM25: self.bm25 = BM25Okapi(self._tokenized_docs) # Create links to similar memories self._create_links_for_new_memory(memory_id, embedding) self.stats["adds"] += 1 self.tuner.record_outcome("quality_threshold", threshold, True) return memory_id def _create_links_for_new_memory(self, memory_id: str, embedding: np.ndarray): """Create neural links to similar memories""" if len(self._ids) < 2: return # Find similar memories similarities = [] for other_id, other_emb in zip(self._ids, self._embeddings): if other_id != memory_id: sim = float(np.dot(embedding, other_emb)) similarities.append((other_id, sim)) # Sort by similarity similarities.sort(key=lambda x: x[1], reverse=True) # Create links for top matches for other_id, sim in similarities[:5]: # Try different link types self.link_manager.create_link( memory_id, other_id, LinkType.SEMANTIC_SIMILARITY, sim ) self.link_manager.create_link( other_id, memory_id, LinkType.SEMANTIC_SIMILARITY, sim ) def search(self, query: str, top_k: int = 5, namespace: Optional[str] = None, use_links: bool = True) -> List[SearchResult]: """ Search with multi-strategy retrieval + neural links """ if not self.memory_manager.semantic_memory: return [] self.stats["searches"] += 1 query_embedding = self._get_embedding(query) threshold = self.tuner.parameters["similarity_threshold"] # Strategy 1: Vector similarity semantic_scores = {} if HAS_FAISS and self.index is not None and self.index.ntotal > 0: k = min(top_k * 3, self.index.ntotal) scores, indices = self.index.search(query_embedding.reshape(1, -1), k) for score, idx in zip(scores[0], indices[0]): if 0 <= idx < len(self._ids): semantic_scores[self._ids[idx]] = float(score) else: for mem_id, emb in zip(self._ids, self._embeddings): semantic_scores[mem_id] = float(np.dot(query_embedding, emb)) # Strategy 2: BM25 bm25_scores = {} if HAS_BM25 and self.bm25 is not None: tokens = query.lower().split() scores = self.bm25.get_scores(tokens) max_score = max(scores) if len(scores) > 0 and max(scores) > 0 else 1 for idx, score in enumerate(scores): if score > 0.1 * max_score: bm25_scores[self._ids[idx]] = float(score / max_score) # Strategy 3: Neural link traversal link_scores = {} if use_links: # Find top semantic matches and traverse their links top_semantic = sorted(semantic_scores.items(), key=lambda x: x[1], reverse=True)[:3] for mem_id, _ in top_semantic: connected = self.link_manager.get_connected(mem_id) for conn_id in connected[:5]: link_scores[conn_id] = link_scores.get(conn_id, 0) + 0.3 # Combine scores (SLM-style weighting) all_ids = set(semantic_scores.keys()) | set(bm25_scores.keys()) | set(link_scores.keys()) if namespace: # Filter by namespace all_ids = {mid for mid in all_ids if mid in self.memory_manager.semantic_memory and self.memory_manager.semantic_memory[mid].namespace == namespace} results = [] for mem_id in all_ids: strat = { "semantic": semantic_scores.get(mem_id, 0), "bm25": bm25_scores.get(mem_id, 0), "links": link_scores.get(mem_id, 0) } combined = ( strat["semantic"] * 0.5 + strat["bm25"] * 0.3 + strat["links"] * 0.2 ) memory = self.memory_manager.semantic_memory.get(mem_id) if memory and combined >= threshold: # Update access tracking memory.access_count += 1 memory.last_accessed = time.time() # Try promotion self.memory_manager.try_promote(mem_id, combined) results.append(SearchResult( id=mem_id, content=memory.content, score=combined, tier=memory.tier, strategy_scores=strat, metadata=memory.metadata )) self.tuner.record_outcome("similarity_threshold", threshold, True) else: self.tuner.record_outcome("similarity_threshold", threshold, False) results.sort(key=lambda x: x.score, reverse=True) return results[:top_k] def get_context(self, query: str, top_k: int = 3, namespace: Optional[str] = None) -> str: """Get formatted context for prompt injection""" results = self.search(query, top_k=top_k, namespace=namespace) if not results: return "" parts = ["[RELEVANT CONTEXT FROM MEMORY]"] for r in results: tier_marker = f"[{r.tier.value.upper()}]" if r.tier != MemoryTier.SEMANTIC else "" parts.append(f"• {tier_marker} {r.content}") parts.append("[END CONTEXT]\n") return "\n".join(parts) def feedback(self, query: str, memory_id: str, relevance: float): """Record feedback for learning""" relevance = max(-1, min(1, relevance)) if memory_id in self.memory_manager.semantic_memory: memory = self.memory_manager.semantic_memory[memory_id] # Update relevance score memory.relevance_score = 0.7 * memory.relevance_score + 0.3 * ((relevance + 1) / 2) # Strengthen/weaken links based on feedback for link_id in self.link_manager.outgoing.get(memory_id, set()): link = self.link_manager.links.get(link_id) if link: link.strength = max(0, min(1, link.strength + relevance * 0.05)) def maintenance_cycle(self): """Run SLM maintenance operations""" # Decay priorities in working memory self.memory_manager.decay_priorities() # Decay link strengths self.link_manager.decay_links() # Prune weak links links_pruned = self.link_manager.prune_weak_links() # Decay memory quality (gentle) memories_decayed = self.memory_manager.decay_memories() self.memory_manager.stats["memories_decayed"] += memories_decayed # Prune stale memories memories_pruned, pruned_ids = self.memory_manager.prune_stale_memories() self.memory_manager.stats["memories_pruned"] += memories_pruned # Clean up links to pruned memories for mem_id in pruned_ids: self.link_manager.remove_links_for_memory(mem_id) # Auto-tune parameters adjustments = self.tuner.auto_tune() return { "links_pruned": links_pruned, "memories_decayed": memories_decayed, "memories_pruned": memories_pruned, "parameter_adjustments": adjustments } def get_stats(self) -> Dict: """Get comprehensive statistics""" return { "memories": { "total": len(self.memory_manager.semantic_memory), **self.memory_manager.get_tier_stats() }, "links": self.link_manager.get_stats(), "utility_predictor": self.utility_predictor.stats, "tuner": { "parameters": self.tuner.parameters, "adjustments": self.tuner.adjustment_count }, "operations": self.stats } def clear(self): """Clear all memory""" self.memory_manager = TieredMemoryManager(self.tuner) self.link_manager = NeuralLinkManager() self._embeddings.clear() self._ids.clear() self._tokenized_docs.clear() self.bm25 = None self._cache.clear() if HAS_FAISS: self.index = faiss.IndexFlatIP(self.embedding_dim) def __len__(self): return len(self.memory_manager.semantic_memory) def __repr__(self): return f"Mnemo(memories={len(self)}, links={len(self.link_manager.links)})" # ============================================================================= # DEMO # ============================================================================= def demo(): print("="*70) print("MNEMO v4: SLM-INSPIRED ARCHITECTURE") print("="*70) m = Mnemo() print(f"\n✓ Initialized: {m}") # Show tuned parameters print("\n📊 Tuned Parameters (adjusted from SLM):") for param, value in m.tuner.parameters.items(): print(f" {param}: {value}") # Add memories print("\n📝 Adding memories...") memories = [ "User prefers Python because it has clean syntax and good libraries", "Previous analysis showed gender bias in Victorian psychiatry diagnoses", "Framework has 5 checkpoints for detecting historical medical bias", "The project deadline is March 15th for the API redesign", "User's coffee preference is cappuccino with oat milk" ] for mem in memories: result = m.add(mem) status = "✓" if result else "✗" print(f" {status} {mem[:50]}...") # Test memory utility predictor print("\n🧠 Memory Utility Predictions:") tests = [ ("What is Python?", False), ("Based on your previous analysis...", True), ("Compare to your earlier findings", True), ("This is a NEW topic", False), ] for query, expected in tests: result = m.should_inject(query) status = "✓" if result == expected else "✗" action = "INJECT" if result else "SKIP" print(f" {status} {action}: {query}") # Search print("\n🔍 Search Results:") results = m.search("previous analysis framework", top_k=3) for r in results: print(f" [{r.tier.value}] score={r.score:.3f}: {r.content[:50]}...") # Show neural links print("\n🔗 Neural Links:") link_stats = m.link_manager.get_stats() print(f" Total links: {link_stats['total_links']}") for lt, count in link_stats['links_by_type'].items(): if count > 0: print(f" {lt}: {count}") # Full stats print("\n📊 Full Statistics:") stats = m.get_stats() print(f" Memories: {stats['memories']['total']}") print(f" Working memory: {stats['memories']['working_memory_count']}") print(f" Links: {stats['links']['total_links']}") print(f" Utility predictions: {stats['utility_predictor']['predictions']}") print("\n" + "="*70) print("✅ Demo complete!") print("="*70) if __name__ == "__main__": demo()