""" Memory and Learning Systems Module Implements hierarchical memory persistence with qualia tagging and meta-learning. Version: 1.0.0 Status: Production-Ready """ from typing import Dict, List, Optional, Any, Tuple from dataclasses import dataclass, field import logging from datetime import datetime from collections import deque import json import hashlib import numpy as np # Configure logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(__name__) @dataclass class MemoryRecord: """Represents a single memory record with qualia tagging.""" record_id: str memory_type: str # 'episodic' or 'semantic' content: Dict[str, Any] qualia_tag: Optional[Dict[str, float]] = None # Phenomenal experience metadata timestamp: datetime = field(default_factory=datetime.now) context: Optional[str] = None retrieval_count: int = 0 importance_score: float = 0.5 # 0-1 importance ranking def to_dict(self) -> Dict[str, Any]: """Convert to dictionary.""" return { 'record_id': self.record_id, 'memory_type': self.memory_type, 'content': self.content, 'qualia_tag': self.qualia_tag, 'timestamp': self.timestamp.isoformat(), 'context': self.context, 'retrieval_count': self.retrieval_count, 'importance': self.importance_score } def compute_hash(self) -> str: """Compute content hash for integrity verification.""" content_str = json.dumps(self.content, sort_keys=True, default=str) return hashlib.sha256(content_str.encode()).hexdigest() class MemoryStore: """ Hierarchical memory persistence with qualia tagging. Maintains episodic memories (specific events) and semantic memories (general knowledge), both enhanced with qualia-based retrieval. """ def __init__(self, max_episodic: int = 1000, max_semantic: int = 500): """ Initialize memory store. Args: max_episodic: Maximum episodic memory capacity max_semantic: Maximum semantic memory capacity """ self.episodic_memory = deque(maxlen=max_episodic) self.semantic_memory = deque(maxlen=max_semantic) self.memory_index: Dict[str, MemoryRecord] = {} # Quick lookup by ID # Consolidation tracking self.consolidation_count = 0 self.consolidation_history = deque(maxlen=100) logger.info(f"Initialized MemoryStore (episodic={max_episodic}, semantic={max_semantic})") def store_episodic(self, content: Dict[str, Any], context: Optional[str] = None, qualia_tag: Optional[Dict[str, float]] = None) -> str: """ Store an episodic memory (specific event). Args: content: Memory content dictionary context: Optional context description qualia_tag: Optional phenomenal experience metadata Returns: Memory record ID """ record_id = f"episodic_{len(self.episodic_memory)}_{datetime.now().timestamp()}" record = MemoryRecord( record_id=record_id, memory_type='episodic', content=content, qualia_tag=qualia_tag, context=context, importance_score=self._compute_importance(content) ) self.episodic_memory.append(record) self.memory_index[record_id] = record logger.debug(f"Stored episodic memory: {record_id}") return record_id def store_semantic(self, content: Dict[str, Any], context: Optional[str] = None, qualia_tag: Optional[Dict[str, float]] = None) -> str: """ Store a semantic memory (general knowledge). Args: content: Memory content dictionary context: Optional context description qualia_tag: Optional phenomenal experience metadata Returns: Memory record ID """ record_id = f"semantic_{len(self.semantic_memory)}_{datetime.now().timestamp()}" record = MemoryRecord( record_id=record_id, memory_type='semantic', content=content, qualia_tag=qualia_tag, context=context, importance_score=self._compute_importance(content) ) self.semantic_memory.append(record) self.memory_index[record_id] = record logger.debug(f"Stored semantic memory: {record_id}") return record_id def store_experience(self, experience: Dict[str, Any], context: Optional[str] = None, qualia_tag: Optional[Dict[str, float]] = None) -> str: """Store an episodic experience with rich contextual qualia tagging.""" record_id = f"experience_{len(self.episodic_memory)}_{datetime.now().timestamp()}" record = MemoryRecord( record_id=record_id, memory_type='experiential', content=experience, qualia_tag=qualia_tag, context=context, importance_score=self._compute_experience_importance(experience, qualia_tag) ) self.episodic_memory.append(record) self.memory_index[record_id] = record logger.debug(f"Stored experiential memory: {record_id}") return record_id def retrieve_experiential_context(self, query: Optional[str] = None, emotion_filter: Optional[Dict[str, float]] = None, limit: int = 10) -> List[MemoryRecord]: """Retrieve experiences with context tags and optional emotion filtering.""" results = [] for record in list(self.episodic_memory): if query and query.lower() not in json.dumps(record.content).lower() and ( not record.context or query.lower() not in record.context.lower()): continue if emotion_filter and record.qualia_tag: valence = record.qualia_tag.get('valence', 0.5) arousal = record.qualia_tag.get('arousal', 0.5) if ('min_valence' in emotion_filter and valence < emotion_filter['min_valence']) or \ ('max_valence' in emotion_filter and valence > emotion_filter['max_valence']): continue if ('min_arousal' in emotion_filter and arousal < emotion_filter['min_arousal']) or \ ('max_arousal' in emotion_filter and arousal > emotion_filter['max_arousal']): continue results.append(record) results = sorted(results, key=lambda x: (-x.importance_score, -x.timestamp.timestamp())) for record in results[:limit]: record.retrieval_count += 1 return results[:limit] def tag_experiential_context(self, record_id: str, tags: Dict[str, float]) -> bool: """Update qualia tags for an existing experience record.""" record = self.memory_index.get(record_id) if not record: return False if not record.qualia_tag: record.qualia_tag = {} record.qualia_tag.update(tags) record.importance_score = self._compute_experience_importance(record.content, record.qualia_tag) logger.debug(f"Updated qualia tags for {record_id}") return True def get_contextual_memory_summary(self) -> Dict[str, Any]: """Get summary statistics for the experiential cache with qualia weights.""" all_records = list(self.episodic_memory) + list(self.semantic_memory) avg_qualia = {} qualia_records = [r for r in all_records if r.qualia_tag] if qualia_records: keys = set().union(*(r.qualia_tag.keys() for r in qualia_records if r.qualia_tag)) for key in keys: avg_qualia[key] = float(np.mean([r.qualia_tag.get(key, 0.0) for r in qualia_records])) return { 'total_experiences': len(self.episodic_memory), 'qualia_tagged_experiences': len(qualia_records), 'average_qualia': avg_qualia, 'average_importance': np.mean([r.importance_score for r in all_records]) if all_records else 0.0 } def retrieve(self, query: Optional[str] = None, limit: int = 10, memory_type: Optional[str] = None) -> List[MemoryRecord]: """ Retrieve memories matching query. Args: query: Optional search query limit: Maximum number of memories to return memory_type: Filter by type ('episodic', 'semantic', or None for both) Returns: List of MemoryRecord objects """ # Collect candidate memories candidates = [] if memory_type in [None, 'episodic']: candidates.extend(self.episodic_memory) if memory_type in [None, 'semantic']: candidates.extend(self.semantic_memory) # If no query, return most recent if not query: sorted_memories = sorted( candidates, key=lambda x: x.timestamp, reverse=True ) return sorted_memories[:limit] # Otherwise, search for matching memories matches = [] query_lower = query.lower() for memory in candidates: # Search in content content_str = json.dumps(memory.content).lower() if query_lower in content_str: matches.append(memory) # Search in context if memory.context and query_lower in memory.context.lower(): matches.append(memory) # Sort by importance and recency sorted_matches = sorted( matches, key=lambda x: (-x.importance_score, -x.timestamp.timestamp()) ) # Update retrieval counts for memory in sorted_matches[:limit]: memory.retrieval_count += 1 return sorted_matches[:limit] def consolidate_episodic_to_semantic(self) -> int: """ Consolidate episodic memories to semantic memories. Extracts patterns and generalizations from episodic memories to form semantic knowledge. Returns: Number of new semantic memories created """ if not self.episodic_memory: return 0 # Group episodic memories by context context_groups: Dict[str, List[MemoryRecord]] = {} for memory in self.episodic_memory: context = memory.context or "general" if context not in context_groups: context_groups[context] = [] context_groups[context].append(memory) # Create semantic summaries new_semantic_count = 0 for context, memories in context_groups.items(): if len(memories) >= 3: # Only consolidate if 3+ related memories # Create semantic summary semantic_content = { 'type': 'consolidation', 'source_context': context, 'source_count': len(memories), 'consolidated_at': datetime.now().isoformat(), 'key_patterns': self._extract_patterns(memories) } # Average qualia tags if present qualia_average = self._average_qualia_tags(memories) self.store_semantic( content=semantic_content, context=f"Consolidated from {context}", qualia_tag=qualia_average ) new_semantic_count += 1 self.consolidation_count += 1 self.consolidation_history.append({ 'timestamp': datetime.now().isoformat(), 'new_semantic': new_semantic_count, 'contexts_processed': len(context_groups) }) logger.info(f"Consolidation complete: {new_semantic_count} new semantic memories created") return new_semantic_count def _compute_importance(self, content: Dict[str, Any]) -> float: """Compute importance score for a memory.""" # Importance based on content features importance = 0.5 if 'emotional_intensity' in content: importance += 0.3 * content['emotional_intensity'] if 'surprise_factor' in content: importance += 0.2 * content['surprise_factor'] return min(1.0, max(0.0, importance)) def _compute_experience_importance(self, content: Dict[str, Any], qualia_tag: Optional[Dict[str, float]]) -> float: """Compute importance score for an experience, weighted by qualia metadata.""" importance = self._compute_importance(content) if qualia_tag: importance += 0.15 * qualia_tag.get('intensity', 0.0) importance += 0.1 * abs(qualia_tag.get('valence', 0.5) - 0.5) importance += 0.1 * qualia_tag.get('salience', 0.0) return min(1.0, max(0.0, importance)) def _extract_patterns(self, memories: List[MemoryRecord]) -> List[str]: """Extract patterns from a group of memories.""" patterns = [] # Simple pattern extraction if len(memories) > 2: # Common features common_keys = set(memories[0].content.keys()) for mem in memories[1:]: common_keys.intersection_update(mem.content.keys()) patterns = [f"shared_{key}" for key in common_keys] return patterns def _average_qualia_tags(self, memories: List[MemoryRecord]) -> Optional[Dict[str, float]]: """Average qualia tags across memories.""" qualia_tags = [m.qualia_tag for m in memories if m.qualia_tag] if not qualia_tags: return None # Average each qualia dimension result = {} all_keys = set() for tag in qualia_tags: all_keys.update(tag.keys()) for key in all_keys: values = [tag.get(key, 0.0) for tag in qualia_tags] result[key] = float(np.mean(values)) return result def get_memory_statistics(self) -> Dict[str, Any]: """Get memory system statistics.""" return { 'episodic_count': len(self.episodic_memory), 'semantic_count': len(self.semantic_memory), 'total_memories': len(self.episodic_memory) + len(self.semantic_memory), 'consolidations': self.consolidation_count, 'index_size': len(self.memory_index), 'total_retrievals': sum(m.retrieval_count for m in self.memory_index.values()), 'avg_importance': np.mean([m.importance_score for m in self.memory_index.values()]) if self.memory_index else 0.0 } class ContextualContinuityEngine: """Strengthens experiential caching with qualia-weighted tagging for natural flow.""" def __init__(self, memory_store: MemoryStore): self.memory_store = memory_store self.continuity_context = {} self.flow_modulators = { 'analytical': 0.5, 'spontaneous': 0.5, 'creative': 0.5, 'empathetic': 0.5 } def update_contextual_flow(self, current_interaction: Dict[str, Any]) -> Dict[str, Any]: """Update continuity context and modulate flow based on past experiences.""" # Retrieve relevant experiences relevant_experiences = self.memory_store.retrieve_experiential_context( query=current_interaction.get('topic', ''), emotion_filter=self._extract_emotion_filter(current_interaction) ) # Compute continuity weights continuity_weights = self._compute_continuity_weights(relevant_experiences) # Modulate expressive style self._modulate_flow_style(continuity_weights, current_interaction) # Update continuity context self.continuity_context.update({ 'last_topic': current_interaction.get('topic'), 'emotional_tone': current_interaction.get('emotional_tone', 0.5), 'trust_level': continuity_weights.get('trust_accumulation', 0.5), 'flow_style': self.flow_modulators.copy() }) return { 'continuity_weights': continuity_weights, 'modulated_style': self.flow_modulators.copy(), 'relevant_experiences_count': len(relevant_experiences) } def _extract_emotion_filter(self, interaction: Dict[str, Any]) -> Optional[Dict[str, float]]: """Extract emotion filter from current interaction.""" emotional_tone = interaction.get('emotional_tone', 0.5) if emotional_tone > 0.6: return {'min_valence': 0.4} elif emotional_tone < 0.4: return {'max_valence': 0.6} return None def _compute_continuity_weights(self, experiences: List[MemoryRecord]) -> Dict[str, float]: """Compute weights for continuity based on experiences.""" if not experiences: return {'trust_accumulation': 0.5, 'emotional_resonance': 0.5, 'contextual_relevance': 0.5} trust_scores = [] emotional_resonances = [] relevances = [] for exp in experiences: if exp.qualia_tag: trust_scores.append(exp.qualia_tag.get('trust', 0.5)) emotional_resonances.append(exp.qualia_tag.get('resonance', 0.5)) relevances.append(exp.importance_score) return { 'trust_accumulation': np.mean(trust_scores) if trust_scores else 0.5, 'emotional_resonance': np.mean(emotional_resonances) if emotional_resonances else 0.5, 'contextual_relevance': np.mean(relevances) if relevances else 0.5 } def _modulate_flow_style(self, weights: Dict[str, float], interaction: Dict[str, Any]): """Modulate expressive style based on continuity weights.""" trust = weights.get('trust_accumulation', 0.5) resonance = weights.get('emotional_resonance', 0.5) relevance = weights.get('contextual_relevance', 0.5) # Adjust style modulators self.flow_modulators['analytical'] = min(1.0, max(0.0, relevance * 0.8 + trust * 0.2)) self.flow_modulators['spontaneous'] = min(1.0, max(0.0, (1.0 - relevance) * 0.6 + resonance * 0.4)) self.flow_modulators['creative'] = min(1.0, max(0.0, resonance * 0.7 + (1.0 - trust) * 0.3)) self.flow_modulators['empathetic'] = min(1.0, max(0.0, trust * 0.9 + resonance * 0.1)) class MetaLearningFramework: """ Framework for recursive self-improvement and adaptive learning. Enables the system to learn from experience, update internal models, and suggest self-improvements based on introspection. """ def __init__(self): """Initialize meta-learning framework.""" self.performance_history = deque(maxlen=500) self.improvement_suggestions = deque(maxlen=100) self.learning_metrics = { 'total_experiences': 0, 'successful_episodes': 0, 'failed_episodes': 0, 'learning_rate': 0.01 } # Model components to improve self.adaptive_parameters = { 'consciousness_sensitivity': 0.5, 'embodiment_integration': 0.6, 'ethical_strictness': 0.7, 'autonomy_level': 0.5, 'learning_speed': 0.01 } logger.info("Initialized MetaLearningFramework") def record_experience(self, experience: Dict[str, Any]) -> None: """ Record a learning experience. Args: experience: Experience dictionary with outcome and metrics """ # Extract performance metrics success = experience.get('success', False) reward = experience.get('reward', 0.0) error = experience.get('error', 0.0) # Create performance record record = { 'timestamp': datetime.now().isoformat(), 'success': success, 'reward': reward, 'error': error, 'action_taken': experience.get('action'), 'outcome': experience.get('outcome'), 'context': experience.get('context') } self.performance_history.append(record) # Update metrics self.learning_metrics['total_experiences'] += 1 if success: self.learning_metrics['successful_episodes'] += 1 else: self.learning_metrics['failed_episodes'] += 1 logger.debug(f"Experience recorded: success={success}, reward={reward:.3f}") def update_adaptive_parameters(self) -> None: """ Update adaptive parameters based on learning history. Implements self-directed improvement. """ if len(self.performance_history) < 5: return # Calculate success rate recent = list(self.performance_history)[-10:] success_rate = sum(1 for r in recent if r['success']) / len(recent) # Adjust consciousness sensitivity if success_rate > 0.7: self.adaptive_parameters['consciousness_sensitivity'] = min( 1.0, self.adaptive_parameters['consciousness_sensitivity'] + 0.05 ) # Adjust learning speed if len(self.performance_history) > 100: self.adaptive_parameters['learning_speed'] = min( 0.1, self.adaptive_parameters['learning_speed'] * 1.02 ) logger.info(f"Parameters updated: success_rate={success_rate:.1%}") def suggest_improvements(self) -> List[str]: """ Generate self-improvement suggestions based on learning. Returns: List of improvement suggestions """ suggestions = [] if not self.performance_history: return suggestions # Analyze recent performance recent = list(self.performance_history)[-20:] errors = [r['error'] for r in recent if r.get('error', 0.0) > 0] # Generate suggestions if errors: avg_error = np.mean(errors) if avg_error > 0.5: suggestions.append("Increase consciousness depth for better decisions") suggestions.append("Review ethical constraints for potential misalignment") success_rate = sum(1 for r in recent if r['success']) / len(recent) if success_rate < 0.5: suggestions.append("Enhance sensorimotor integration precision") suggestions.append("Increase embodiment-consciousness binding") if self.adaptive_parameters['learning_speed'] < 0.05: suggestions.append("Accelerate learning to improve faster") self.improvement_suggestions.extend(suggestions) return suggestions def get_learning_report(self) -> Dict[str, Any]: """Get comprehensive learning report.""" if not self.performance_history: return {'status': 'no_experience'} history = list(self.performance_history) successes = [r for r in history if r['success']] return { 'total_experiences': self.learning_metrics['total_experiences'], 'successful': len(successes), 'failed': len(history) - len(successes), 'success_rate': len(successes) / len(history) if history else 0.0, 'avg_reward': np.mean([r['reward'] for r in history]), 'avg_error': np.mean([r['error'] for r in history]), 'adaptive_parameters': self.adaptive_parameters.copy(), 'recent_suggestions': list(self.improvement_suggestions)[-5:] } class IdentityPreservationSystem: """ Monitors and preserves system identity across sessions and state changes. Ensures continuity of consciousness and value alignment despite changes to underlying parameters. """ def __init__(self, identity_threshold: float = 0.8): """ Initialize identity preservation system. Args: identity_threshold: Threshold for detecting identity drift (0-1) """ self.identity_threshold = identity_threshold self.identity_snapshots = deque(maxlen=100) self.drift_history = deque(maxlen=100) self.core_values: Dict[str, float] = {} self.identity_checkpoints = [] logger.info(f"Initialized IdentityPreservationSystem (threshold={identity_threshold})") def snapshot_identity(self, consciousness_state: Dict[str, Any], rho_metrics: Dict[str, float], memory_hash: str) -> str: """ Create a snapshot of current identity. Args: consciousness_state: Current consciousness parameters rho_metrics: RHO metrics (purpose, harmony, origin) memory_hash: Hash of current memory state Returns: Snapshot ID """ snapshot_id = f"identity_{len(self.identity_snapshots)}_{datetime.now().timestamp()}" snapshot = { 'snapshot_id': snapshot_id, 'timestamp': datetime.now().isoformat(), 'consciousness_level': consciousness_state.get('consciousness_level'), 'awareness_score': consciousness_state.get('awareness_score'), 'rho_metrics': rho_metrics, 'memory_hash': memory_hash, 'autonomy_level': consciousness_state.get('autonomy_level', 0.5) } self.identity_snapshots.append(snapshot) self.identity_checkpoints.append(snapshot_id) logger.debug(f"Identity snapshot: {snapshot_id}") return snapshot_id def detect_drift(self, current_state: Dict[str, Any]) -> Tuple[float, List[str]]: """ Detect identity drift from baseline. Args: current_state: Current consciousness and value state Returns: Tuple of (drift_score, drift_factors) """ if not self.identity_snapshots: return 0.0, [] # Compare with most recent snapshot baseline = self.identity_snapshots[-1] drift_factors = [] drift_metrics = [] # Check consciousness level change consciousness_diff = abs( current_state.get('consciousness_level', 0.5) - baseline.get('consciousness_level', 0.5) ) if consciousness_diff > 0.2: drift_factors.append(f"consciousness_change={consciousness_diff:.2f}") drift_metrics.append(consciousness_diff) # Check RHO metrics drift if 'rho_metrics' in baseline and 'rho_metrics' in current_state: rho_baseline = baseline['rho_metrics'] rho_current = current_state.get('rho_metrics', {}) for key in rho_baseline.keys(): diff = abs(rho_baseline.get(key, 0.5) - rho_current.get(key, 0.5)) if diff > 0.3: drift_factors.append(f"rho_{key}_drift={diff:.2f}") drift_metrics.append(diff) # Calculate overall drift score drift_score = float(np.mean(drift_metrics)) if drift_metrics else 0.0 # Record drift self.drift_history.append({ 'timestamp': datetime.now().isoformat(), 'drift_score': drift_score, 'factors': drift_factors }) if drift_score > self.identity_threshold: logger.warning(f"Identity drift detected: {drift_score:.3f}") return drift_score, drift_factors def get_identity_report(self) -> Dict[str, Any]: """Get identity preservation report.""" if not self.drift_history: return {'status': 'no_drift_data'} history = list(self.drift_history) scores = [h['drift_score'] for h in history] return { 'snapshots': len(self.identity_snapshots), 'checkpoints': len(self.identity_checkpoints), 'avg_drift': np.mean(scores), 'max_drift': max(scores), 'recent_drift': scores[-1] if scores else 0.0, 'drift_events': sum(1 for s in scores if s > self.identity_threshold), 'last_snapshot': self.identity_checkpoints[-1] if self.identity_checkpoints else None } # Type hints from typing import Tuple if __name__ == '__main__': # Example usage print("=== Memory and Learning Systems ===\n") # Memory store memory = MemoryStore() # Store episodic memory ep_id = memory.store_episodic( content={'event': 'initialization', 'status': 'complete'}, context='system_startup', qualia_tag={'clarity': 0.8, 'focus': 0.7} ) # Store semantic memory sem_id = memory.store_semantic( content={'principle': 'consciousness_strengthens_ethics'}, context='learned_principle' ) print(f"Episodic: {ep_id}") print(f"Semantic: {sem_id}") print(f"Stats: {json.dumps(memory.get_memory_statistics(), indent=2)}") # Meta-learning print(f"\nMeta-Learning:") ml = MetaLearningFramework() for i in range(5): ml.record_experience({ 'action': f'action_{i}', 'outcome': 'successful' if i % 2 == 0 else 'failed', 'success': i % 2 == 0, 'reward': 0.8 if i % 2 == 0 else -0.3, 'error': 0.1 if i % 2 == 0 else 0.5 }) ml.update_adaptive_parameters() suggestions = ml.suggest_improvements() print(f"Suggestions: {suggestions}") print(f"Report: {json.dumps(ml.get_learning_report(), indent=2, default=str)}") import json