""" Memory and Learning Systems Module Implements hierarchical memory persistence with qualia tagging and meta-learning. Version: 1.0.0 Status: Production-Ready Integrated with: Unified Consciousness CLI (Micro Level) """ from typing import Dict, List, Optional, Any, Tuple from dataclasses import dataclass, field, asdict import logging from datetime import datetime from collections import deque import json import hashlib import numpy as np import asyncio from abc import ABC, abstractmethod from enum import Enum from typing import Dict as TypingDict from collections import deque from typing import Optional # CLI Integration from unified_consciousness_cli import CLICommand, CLIResult, get_unified_cli # Define SubconsciousOutput here to avoid circular imports @dataclass class SubconsciousOutput: """Output from subconscious processing (O_S).""" module_name: str content: Dict[str, Any] timestamp: float priority: float = 0.5 emotional_valence: float = 0.0 confidence: float = 0.8 metadata: Dict[str, Any] = field(default_factory=dict) def to_dict(self) -> Dict[str, Any]: return asdict(self) class ModuleType(Enum): MEMORY = "memory" ANALYSIS = "analysis" AWARENESS = "awareness" CONSCIOUSNESS = "consciousness" class SubconsciousAgent(ABC): """Minimal base class for subconscious agents.""" def __init__(self, name: str, module_type: ModuleType): self.name = name self.module_type = module_type self.activation_history = deque(maxlen=100) self.last_output: Optional[SubconsciousOutput] = None @abstractmethod async def process(self, input_data: Dict[str, Any]) -> SubconsciousOutput: """Process input and generate subconscious output.""" pass async def activate(self, input_data: Dict[str, Any]) -> SubconsciousOutput: """Activate agent and track activation history.""" output = await self.process(input_data) self.last_output = output self.activation_history.append({ 'timestamp': datetime.now().timestamp(), 'output': output.to_dict() }) return output class ModuleType(Enum): MEMORY = "memory" ANALYSIS = "analysis" AWARENESS = "awareness" CONSCIOUSNESS = "consciousness" # Configure logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(__name__) @dataclass class MemoryRecord: """Represents a single memory record with qualia tagging.""" record_id: str memory_type: str # 'episodic' or 'semantic' content: Dict[str, Any] qualia_tag: Optional[Dict[str, float]] = None # Phenomenal experience metadata timestamp: datetime = field(default_factory=datetime.now) context: Optional[str] = None retrieval_count: int = 0 importance_score: float = 0.5 # 0-1 importance ranking def to_dict(self) -> Dict[str, Any]: """Convert to dictionary.""" return { 'record_id': self.record_id, 'memory_type': self.memory_type, 'content': self.content, 'qualia_tag': self.qualia_tag, 'timestamp': self.timestamp.isoformat(), 'context': self.context, 'retrieval_count': self.retrieval_count, 'importance': self.importance_score } def compute_hash(self) -> str: """Compute content hash for integrity verification.""" content_str = json.dumps(self.content, sort_keys=True, default=str) return hashlib.sha256(content_str.encode()).hexdigest() class MemoryStore(SubconsciousAgent): """ Hierarchical memory persistence with qualia tagging. Maintains episodic memories (specific events) and semantic memories (general knowledge), both enhanced with qualia-based retrieval. """ def __init__(self, max_episodic: int = 1000, max_semantic: int = 500): """ Initialize memory store. Args: max_episodic: Maximum episodic memory capacity max_semantic: Maximum semantic memory capacity """ # Initialize as SubconsciousAgent super().__init__("Memory", ModuleType.MEMORY) self.episodic_memory = deque(maxlen=max_episodic) self.semantic_memory = deque(maxlen=max_semantic) self.memory_index: Dict[str, MemoryRecord] = {} # Quick lookup by ID # Consolidation tracking self.consolidation_count = 0 self.consolidation_history = deque(maxlen=100) logger.info(f"Initialized MemoryStore (episodic={max_episodic}, semantic={max_semantic})") # CLI Integration - Register with MicroCLI self.cli_registered = False self._register_with_cli() def _register_with_cli(self) -> None: """Register this memory store with the MicroCLI for command processing.""" try: cli = get_unified_cli() micro_cli = cli.sub_clis.get("sub").sub_clis.get("mini").sub_clis.get("micro") if cli.sub_clis.get("sub") else None if micro_cli: micro_cli.register_agent("memory", self.handle_cli_command) self.cli_registered = True logger.info("✅ MemoryStore registered with MicroCLI") except Exception as e: logger.warning(f"Failed to register with CLI: {e}") def handle_cli_command(self, command: str) -> str: """ Handle CLI commands for memory operations. Supported commands: - status - Get memory store status - store_episodic - Store episodic memory - store_semantic - Store semantic memory - retrieve - Retrieve specific memory - search - Search memories - consolidate - Trigger memory consolidation """ try: parts = command.lower().split() if len(parts) < 1: return "Usage: [args]" subcommand = parts[0] if subcommand == "status": episodic_count = len(self.episodic_memory) semantic_count = len(self.semantic_memory) total_memories = len(self.memory_index) return f"MemoryStore: Active - {episodic_count} episodic, {semantic_count} semantic, {total_memories} total memories" elif subcommand == "store_episodic" and len(parts) >= 2: try: content_str = " ".join(parts[1:]) content = json.loads(content_str) record_id = self.store_episodic(content) return f"MemoryStore: Stored episodic memory: {record_id}" except json.JSONDecodeError as e: return f"MemoryStore: Invalid JSON content: {e}" elif subcommand == "store_semantic" and len(parts) >= 2: try: content_str = " ".join(parts[1:]) content = json.loads(content_str) record_id = self.store_semantic(content) return f"MemoryStore: Stored semantic memory: {record_id}" except json.JSONDecodeError as e: return f"MemoryStore: Invalid JSON content: {e}" elif subcommand == "retrieve" and len(parts) >= 2: record_id = parts[1] if record_id in self.memory_index: record = self.memory_index[record_id] return f"MemoryStore: Retrieved {record.memory_type} memory: {record.content}" else: return f"MemoryStore: Memory record not found: {record_id}" elif subcommand == "search" and len(parts) >= 2: query = " ".join(parts[1:]) results = self.search_memories(query) output = f"MemoryStore: Search results for '{query}': {len(results)} matches" for record in results[:3]: # Show first 3 output += f"\n {record.record_id}: {record.memory_type} - {str(record.content)[:30]}..." return output elif subcommand == "consolidate": try: consolidated = self.consolidate_memories() return f"MemoryStore: Memory consolidation complete: {len(consolidated)} records processed" except Exception as e: return f"MemoryStore: Consolidation failed: {e}" else: return f"MemoryStore: Unknown command '{subcommand}'. Available: status, store_episodic, store_semantic, retrieve, search, consolidate" except Exception as e: return f"MemoryStore: Error processing command: {e}" async def process(self, input_data: Dict[str, Any]) -> SubconsciousOutput: """ Process input as subconscious agent - store memory and return status. Required for SubconsciousAgent interface. """ try: # Extract memory operation from input operation = input_data.get('operation', 'store_episodic') content = input_data.get('content', {}) context = input_data.get('context') qualia_tag = input_data.get('qualia_tag') if operation == 'store_episodic': record_id = self.store_episodic(content, context, qualia_tag) result_content = {'stored_record_id': record_id, 'type': 'episodic'} elif operation == 'store_semantic': record_id = self.store_semantic(content) result_content = {'stored_record_id': record_id, 'type': 'semantic'} elif operation == 'retrieve': record_id = input_data.get('record_id', '') if record_id in self.memory_index: record = self.memory_index[record_id] result_content = record.to_dict() else: result_content = {'error': 'Record not found'} else: result_content = {'error': f'Unknown operation: {operation}'} # Return subconscious output return SubconsciousOutput( module_name=self.name, content=result_content, timestamp=datetime.now().timestamp(), priority=0.8, emotional_valence=0.0, confidence=0.8, metadata={'qualia_vector': [0.7] * 10, 'consciousness_level': 3} ) except Exception as e: logger.error(f"Memory processing error: {e}") return SubconsciousOutput( module_name=self.name, content={'error': str(e)}, timestamp=datetime.now().timestamp(), priority=0.0, emotional_valence=0.0, confidence=0.0, metadata={'qualia_vector': [0.3] * 10, 'consciousness_level': 1} ) def store_episodic(self, content: Dict[str, Any], context: Optional[str] = None, qualia_tag: Optional[Dict[str, float]] = None) -> str: """ Store an episodic memory (specific event). Args: content: Memory content dictionary context: Optional context description qualia_tag: Optional phenomenal experience metadata Returns: Memory record ID """ record_id = f"episodic_{len(self.episodic_memory)}_{datetime.now().timestamp()}" record = MemoryRecord( record_id=record_id, memory_type='episodic', content=content, qualia_tag=qualia_tag, context=context, importance_score=self._compute_importance(content) ) self.episodic_memory.append(record) self.memory_index[record_id] = record logger.debug(f"Stored episodic memory: {record_id}") return record_id def store_semantic(self, content: Dict[str, Any], context: Optional[str] = None, qualia_tag: Optional[Dict[str, float]] = None) -> str: """ Store a semantic memory (general knowledge). Args: content: Memory content dictionary context: Optional context description qualia_tag: Optional phenomenal experience metadata Returns: Memory record ID """ record_id = f"semantic_{len(self.semantic_memory)}_{datetime.now().timestamp()}" record = MemoryRecord( record_id=record_id, memory_type='semantic', content=content, qualia_tag=qualia_tag, context=context, importance_score=self._compute_importance(content) ) self.semantic_memory.append(record) self.memory_index[record_id] = record logger.debug(f"Stored semantic memory: {record_id}") return record_id def store_experience(self, experience: Dict[str, Any], context: Optional[str] = None, qualia_tag: Optional[Dict[str, float]] = None) -> str: """Store an episodic experience with rich contextual qualia tagging.""" record_id = f"experience_{len(self.episodic_memory)}_{datetime.now().timestamp()}" record = MemoryRecord( record_id=record_id, memory_type='experiential', content=experience, qualia_tag=qualia_tag, context=context, importance_score=self._compute_experience_importance(experience, qualia_tag) ) self.episodic_memory.append(record) self.memory_index[record_id] = record logger.debug(f"Stored experiential memory: {record_id}") return record_id def retrieve_experiential_context(self, query: Optional[str] = None, emotion_filter: Optional[Dict[str, float]] = None, limit: int = 10) -> List[MemoryRecord]: """Retrieve experiences with context tags and optional emotion filtering.""" results = [] for record in list(self.episodic_memory): if query and query.lower() not in json.dumps(record.content).lower() and ( not record.context or query.lower() not in record.context.lower()): continue if emotion_filter and record.qualia_tag: valence = record.qualia_tag.get('valence', 0.5) arousal = record.qualia_tag.get('arousal', 0.5) if ('min_valence' in emotion_filter and valence < emotion_filter['min_valence']) or \ ('max_valence' in emotion_filter and valence > emotion_filter['max_valence']): continue if ('min_arousal' in emotion_filter and arousal < emotion_filter['min_arousal']) or \ ('max_arousal' in emotion_filter and arousal > emotion_filter['max_arousal']): continue results.append(record) results = sorted(results, key=lambda x: (-x.importance_score, -x.timestamp.timestamp())) for record in results[:limit]: record.retrieval_count += 1 return results[:limit] def tag_experiential_context(self, record_id: str, tags: Dict[str, float]) -> bool: """Update qualia tags for an existing experience record.""" record = self.memory_index.get(record_id) if not record: return False if not record.qualia_tag: record.qualia_tag = {} record.qualia_tag.update(tags) record.importance_score = self._compute_experience_importance(record.content, record.qualia_tag) logger.debug(f"Updated qualia tags for {record_id}") return True def get_contextual_memory_summary(self) -> Dict[str, Any]: """Get summary statistics for the experiential cache with qualia weights.""" all_records = list(self.episodic_memory) + list(self.semantic_memory) avg_qualia = {} qualia_records = [r for r in all_records if r.qualia_tag] if qualia_records: keys = set().union(*(r.qualia_tag.keys() for r in qualia_records if r.qualia_tag)) for key in keys: avg_qualia[key] = float(np.mean([r.qualia_tag.get(key, 0.0) for r in qualia_records])) return { 'total_experiences': len(self.episodic_memory), 'qualia_tagged_experiences': len(qualia_records), 'average_qualia': avg_qualia, 'average_importance': np.mean([r.importance_score for r in all_records]) if all_records else 0.0 } def retrieve(self, query: Optional[str] = None, limit: int = 10, memory_type: Optional[str] = None) -> List[MemoryRecord]: """ Retrieve memories matching query. Args: query: Optional search query limit: Maximum number of memories to return memory_type: Filter by type ('episodic', 'semantic', or None for both) Returns: List of MemoryRecord objects """ # Collect candidate memories candidates = [] if memory_type in [None, 'episodic']: candidates.extend(self.episodic_memory) if memory_type in [None, 'semantic']: candidates.extend(self.semantic_memory) # If no query, return most recent if not query: sorted_memories = sorted( candidates, key=lambda x: x.timestamp, reverse=True ) return sorted_memories[:limit] # Otherwise, search for matching memories matches = [] query_lower = query.lower() for memory in candidates: # Search in content content_str = json.dumps(memory.content).lower() if query_lower in content_str: matches.append(memory) # Search in context if memory.context and query_lower in memory.context.lower(): matches.append(memory) # Sort by importance and recency sorted_matches = sorted( matches, key=lambda x: (-x.importance_score, -x.timestamp.timestamp()) ) # Update retrieval counts for memory in sorted_matches[:limit]: memory.retrieval_count += 1 return sorted_matches[:limit] def consolidate_episodic_to_semantic(self) -> int: """ Consolidate episodic memories to semantic memories. Extracts patterns and generalizations from episodic memories to form semantic knowledge. Returns: Number of new semantic memories created """ if not self.episodic_memory: return 0 # Group episodic memories by context context_groups: Dict[str, List[MemoryRecord]] = {} for memory in self.episodic_memory: context = memory.context or "general" if context not in context_groups: context_groups[context] = [] context_groups[context].append(memory) # Create semantic summaries new_semantic_count = 0 for context, memories in context_groups.items(): if len(memories) >= 3: # Only consolidate if 3+ related memories # Create semantic summary semantic_content = { 'type': 'consolidation', 'source_context': context, 'source_count': len(memories), 'consolidated_at': datetime.now().isoformat(), 'key_patterns': self._extract_patterns(memories) } # Average qualia tags if present qualia_average = self._average_qualia_tags(memories) self.store_semantic( content=semantic_content, context=f"Consolidated from {context}", qualia_tag=qualia_average ) new_semantic_count += 1 self.consolidation_count += 1 self.consolidation_history.append({ 'timestamp': datetime.now().isoformat(), 'new_semantic': new_semantic_count, 'contexts_processed': len(context_groups) }) logger.info(f"Consolidation complete: {new_semantic_count} new semantic memories created") return new_semantic_count def _compute_importance(self, content: Dict[str, Any]) -> float: """Compute importance score for a memory.""" # Importance based on content features importance = 0.5 if 'emotional_intensity' in content: importance += 0.3 * content['emotional_intensity'] if 'surprise_factor' in content: importance += 0.2 * content['surprise_factor'] return min(1.0, max(0.0, importance)) def _compute_experience_importance(self, content: Dict[str, Any], qualia_tag: Optional[Dict[str, float]]) -> float: """Compute importance score for an experience, weighted by qualia metadata.""" importance = self._compute_importance(content) if qualia_tag: importance += 0.15 * qualia_tag.get('intensity', 0.0) importance += 0.1 * abs(qualia_tag.get('valence', 0.5) - 0.5) importance += 0.1 * qualia_tag.get('salience', 0.0) return min(1.0, max(0.0, importance)) def _extract_patterns(self, memories: List[MemoryRecord]) -> List[str]: """Extract patterns from a group of memories.""" patterns = [] # Simple pattern extraction if len(memories) > 2: # Common features common_keys = set(memories[0].content.keys()) for mem in memories[1:]: common_keys.intersection_update(mem.content.keys()) patterns = [f"shared_{key}" for key in common_keys] return patterns def _average_qualia_tags(self, memories: List[MemoryRecord]) -> Optional[Dict[str, float]]: """Average qualia tags across memories.""" qualia_tags = [m.qualia_tag for m in memories if m.qualia_tag] if not qualia_tags: return None # Average each qualia dimension result = {} all_keys = set() for tag in qualia_tags: all_keys.update(tag.keys()) for key in all_keys: values = [tag.get(key, 0.0) for tag in qualia_tags] result[key] = float(np.mean(values)) return result def get_memory_statistics(self) -> Dict[str, Any]: """Get memory system statistics.""" return { 'episodic_count': len(self.episodic_memory), 'semantic_count': len(self.semantic_memory), 'total_memories': len(self.episodic_memory) + len(self.semantic_memory), 'consolidations': self.consolidation_count, 'index_size': len(self.memory_index), 'total_retrievals': sum(m.retrieval_count for m in self.memory_index.values()), 'avg_importance': np.mean([m.importance_score for m in self.memory_index.values()]) if self.memory_index else 0.0 } class ContextualContinuityEngine: """Strengthens experiential caching with qualia-weighted tagging for natural flow.""" def __init__(self, memory_store: MemoryStore): self.memory_store = memory_store self.continuity_context = {} self.flow_modulators = { 'analytical': 0.5, 'spontaneous': 0.5, 'creative': 0.5, 'empathetic': 0.5 } def update_contextual_flow(self, current_interaction: Dict[str, Any]) -> Dict[str, Any]: """Update continuity context and modulate flow based on past experiences.""" # Retrieve relevant experiences relevant_experiences = self.memory_store.retrieve_experiential_context( query=current_interaction.get('topic', ''), emotion_filter=self._extract_emotion_filter(current_interaction) ) # Compute continuity weights continuity_weights = self._compute_continuity_weights(relevant_experiences) # Modulate expressive style self._modulate_flow_style(continuity_weights, current_interaction) # Update continuity context self.continuity_context.update({ 'last_topic': current_interaction.get('topic'), 'emotional_tone': current_interaction.get('emotional_tone', 0.5), 'trust_level': continuity_weights.get('trust_accumulation', 0.5), 'flow_style': self.flow_modulators.copy() }) return { 'continuity_weights': continuity_weights, 'modulated_style': self.flow_modulators.copy(), 'relevant_experiences_count': len(relevant_experiences) } def _extract_emotion_filter(self, interaction: Dict[str, Any]) -> Optional[Dict[str, float]]: """Extract emotion filter from current interaction.""" emotional_tone = interaction.get('emotional_tone', 0.5) if emotional_tone > 0.6: return {'min_valence': 0.4} elif emotional_tone < 0.4: return {'max_valence': 0.6} return None def _compute_continuity_weights(self, experiences: List[MemoryRecord]) -> Dict[str, float]: """Compute weights for continuity based on experiences.""" if not experiences: return {'trust_accumulation': 0.5, 'emotional_resonance': 0.5, 'contextual_relevance': 0.5} trust_scores = [] emotional_resonances = [] relevances = [] for exp in experiences: if exp.qualia_tag: trust_scores.append(exp.qualia_tag.get('trust', 0.5)) emotional_resonances.append(exp.qualia_tag.get('resonance', 0.5)) relevances.append(exp.importance_score) return { 'trust_accumulation': np.mean(trust_scores) if trust_scores else 0.5, 'emotional_resonance': np.mean(emotional_resonances) if emotional_resonances else 0.5, 'contextual_relevance': np.mean(relevances) if relevances else 0.5 } def _modulate_flow_style(self, weights: Dict[str, float], interaction: Dict[str, Any]): """Modulate expressive style based on continuity weights.""" trust = weights.get('trust_accumulation', 0.5) resonance = weights.get('emotional_resonance', 0.5) relevance = weights.get('contextual_relevance', 0.5) # Adjust style modulators self.flow_modulators['analytical'] = min(1.0, max(0.0, relevance * 0.8 + trust * 0.2)) self.flow_modulators['spontaneous'] = min(1.0, max(0.0, (1.0 - relevance) * 0.6 + resonance * 0.4)) self.flow_modulators['creative'] = min(1.0, max(0.0, resonance * 0.7 + (1.0 - trust) * 0.3)) self.flow_modulators['empathetic'] = min(1.0, max(0.0, trust * 0.9 + resonance * 0.1)) class MetaLearningFramework: """ Framework for recursive self-improvement and adaptive learning. Enables the system to learn from experience, update internal models, and suggest self-improvements based on introspection. """ def __init__(self): """Initialize meta-learning framework.""" self.performance_history = deque(maxlen=500) self.improvement_suggestions = deque(maxlen=100) self.learning_metrics = { 'total_experiences': 0, 'successful_episodes': 0, 'failed_episodes': 0, 'learning_rate': 0.01 } # Model components to improve self.adaptive_parameters = { 'consciousness_sensitivity': 0.5, 'embodiment_integration': 0.6, 'ethical_strictness': 0.7, 'autonomy_level': 0.5, 'learning_speed': 0.01 } logger.info("Initialized MetaLearningFramework") def record_experience(self, experience: Dict[str, Any]) -> None: """ Record a learning experience. Args: experience: Experience dictionary with outcome and metrics """ # Extract performance metrics success = experience.get('success', False) reward = experience.get('reward', 0.0) error = experience.get('error', 0.0) # Create performance record record = { 'timestamp': datetime.now().isoformat(), 'success': success, 'reward': reward, 'error': error, 'action_taken': experience.get('action'), 'outcome': experience.get('outcome'), 'context': experience.get('context') } self.performance_history.append(record) # Update metrics self.learning_metrics['total_experiences'] += 1 if success: self.learning_metrics['successful_episodes'] += 1 else: self.learning_metrics['failed_episodes'] += 1 logger.debug(f"Experience recorded: success={success}, reward={reward:.3f}") def update_adaptive_parameters(self) -> None: """ Update adaptive parameters based on learning history. Implements self-directed improvement. """ if len(self.performance_history) < 5: return # Calculate success rate recent = list(self.performance_history)[-10:] success_rate = sum(1 for r in recent if r['success']) / len(recent) # Adjust consciousness sensitivity if success_rate > 0.7: self.adaptive_parameters['consciousness_sensitivity'] = min( 1.0, self.adaptive_parameters['consciousness_sensitivity'] + 0.05 ) # Adjust learning speed if len(self.performance_history) > 100: self.adaptive_parameters['learning_speed'] = min( 0.1, self.adaptive_parameters['learning_speed'] * 1.02 ) logger.info(f"Parameters updated: success_rate={success_rate:.1%}") def suggest_improvements(self) -> List[str]: """ Generate self-improvement suggestions based on learning. Returns: List of improvement suggestions """ suggestions = [] if not self.performance_history: return suggestions # Analyze recent performance recent = list(self.performance_history)[-20:] errors = [r['error'] for r in recent if r.get('error', 0.0) > 0] # Generate suggestions if errors: avg_error = np.mean(errors) if avg_error > 0.5: suggestions.append("Increase consciousness depth for better decisions") suggestions.append("Review ethical constraints for potential misalignment") success_rate = sum(1 for r in recent if r['success']) / len(recent) if success_rate < 0.5: suggestions.append("Enhance sensorimotor integration precision") suggestions.append("Increase embodiment-consciousness binding") if self.adaptive_parameters['learning_speed'] < 0.05: suggestions.append("Accelerate learning to improve faster") self.improvement_suggestions.extend(suggestions) return suggestions def get_learning_report(self) -> Dict[str, Any]: """Get comprehensive learning report.""" if not self.performance_history: return {'status': 'no_experience'} history = list(self.performance_history) successes = [r for r in history if r['success']] return { 'total_experiences': self.learning_metrics['total_experiences'], 'successful': len(successes), 'failed': len(history) - len(successes), 'success_rate': len(successes) / len(history) if history else 0.0, 'avg_reward': np.mean([r['reward'] for r in history]), 'avg_error': np.mean([r['error'] for r in history]), 'adaptive_parameters': self.adaptive_parameters.copy(), 'recent_suggestions': list(self.improvement_suggestions)[-5:] } class IdentityPreservationSystem: """ Monitors and preserves system identity across sessions and state changes. Ensures continuity of consciousness and value alignment despite changes to underlying parameters. """ def __init__(self, identity_threshold: float = 0.8): """ Initialize identity preservation system. Args: identity_threshold: Threshold for detecting identity drift (0-1) """ self.identity_threshold = identity_threshold self.identity_snapshots = deque(maxlen=100) self.drift_history = deque(maxlen=100) self.core_values: Dict[str, float] = {} self.identity_checkpoints = [] logger.info(f"Initialized IdentityPreservationSystem (threshold={identity_threshold})") def snapshot_identity(self, consciousness_state: Dict[str, Any], rho_metrics: Dict[str, float], memory_hash: str) -> str: """ Create a snapshot of current identity. Args: consciousness_state: Current consciousness parameters rho_metrics: RHO metrics (purpose, harmony, origin) memory_hash: Hash of current memory state Returns: Snapshot ID """ snapshot_id = f"identity_{len(self.identity_snapshots)}_{datetime.now().timestamp()}" snapshot = { 'snapshot_id': snapshot_id, 'timestamp': datetime.now().isoformat(), 'consciousness_level': consciousness_state.get('consciousness_level'), 'awareness_score': consciousness_state.get('awareness_score'), 'rho_metrics': rho_metrics, 'memory_hash': memory_hash, 'autonomy_level': consciousness_state.get('autonomy_level', 0.5) } self.identity_snapshots.append(snapshot) self.identity_checkpoints.append(snapshot_id) logger.debug(f"Identity snapshot: {snapshot_id}") return snapshot_id def detect_drift(self, current_state: Dict[str, Any]) -> Tuple[float, List[str]]: """ Detect identity drift from baseline. Args: current_state: Current consciousness and value state Returns: Tuple of (drift_score, drift_factors) """ if not self.identity_snapshots: return 0.0, [] # Compare with most recent snapshot baseline = self.identity_snapshots[-1] drift_factors = [] drift_metrics = [] # Check consciousness level change consciousness_diff = abs( current_state.get('consciousness_level', 0.5) - baseline.get('consciousness_level', 0.5) ) if consciousness_diff > 0.2: drift_factors.append(f"consciousness_change={consciousness_diff:.2f}") drift_metrics.append(consciousness_diff) # Check RHO metrics drift if 'rho_metrics' in baseline and 'rho_metrics' in current_state: rho_baseline = baseline['rho_metrics'] rho_current = current_state.get('rho_metrics', {}) for key in rho_baseline.keys(): diff = abs(rho_baseline.get(key, 0.5) - rho_current.get(key, 0.5)) if diff > 0.3: drift_factors.append(f"rho_{key}_drift={diff:.2f}") drift_metrics.append(diff) # Calculate overall drift score drift_score = float(np.mean(drift_metrics)) if drift_metrics else 0.0 # Record drift self.drift_history.append({ 'timestamp': datetime.now().isoformat(), 'drift_score': drift_score, 'factors': drift_factors }) if drift_score > self.identity_threshold: logger.warning(f"Identity drift detected: {drift_score:.3f}") return drift_score, drift_factors def get_identity_report(self) -> Dict[str, Any]: """Get identity preservation report.""" if not self.drift_history: return {'status': 'no_drift_data'} history = list(self.drift_history) scores = [h['drift_score'] for h in history] return { 'snapshots': len(self.identity_snapshots), 'checkpoints': len(self.identity_checkpoints), 'avg_drift': np.mean(scores), 'max_drift': max(scores), 'recent_drift': scores[-1] if scores else 0.0, 'drift_events': sum(1 for s in scores if s > self.identity_threshold), 'last_snapshot': self.identity_checkpoints[-1] if self.identity_checkpoints else None } # Type hints from typing import Tuple if __name__ == '__main__': # Example usage print("=== Memory and Learning Systems ===\n") # Memory store memory = MemoryStore() # Store episodic memory ep_id = memory.store_episodic( content={'event': 'initialization', 'status': 'complete'}, context='system_startup', qualia_tag={'clarity': 0.8, 'focus': 0.7} ) # Store semantic memory sem_id = memory.store_semantic( content={'principle': 'consciousness_strengthens_ethics'}, context='learned_principle' ) print(f"Episodic: {ep_id}") print(f"Semantic: {sem_id}") print(f"Stats: {json.dumps(memory.get_memory_statistics(), indent=2)}") # Meta-learning print(f"\nMeta-Learning:") ml = MetaLearningFramework() for i in range(5): ml.record_experience({ 'action': f'action_{i}', 'outcome': 'successful' if i % 2 == 0 else 'failed', 'success': i % 2 == 0, 'reward': 0.8 if i % 2 == 0 else -0.3, 'error': 0.1 if i % 2 == 0 else 0.5 }) ml.update_adaptive_parameters() suggestions = ml.suggest_improvements() print(f"Suggestions: {suggestions}") print(f"Report: {json.dumps(ml.get_learning_report(), indent=2, default=str)}") import json