| """
|
| Memory and Learning Systems Module
|
| Implements hierarchical memory persistence with qualia tagging and meta-learning.
|
|
|
| Version: 1.0.0
|
| Status: Production-Ready
|
| Integrated with: Unified Consciousness CLI (Micro Level)
|
| """
|
|
|
| from typing import Dict, List, Optional, Any, Tuple
|
| from dataclasses import dataclass, field, asdict
|
| import logging
|
| from datetime import datetime
|
| from collections import deque
|
| import json
|
| import hashlib
|
| import numpy as np
|
| import asyncio
|
|
|
| from abc import ABC, abstractmethod
|
| from enum import Enum
|
| from typing import Dict as TypingDict
|
| from collections import deque
|
| from typing import Optional
|
|
|
|
|
| from unified_consciousness_cli import CLICommand, CLIResult, get_unified_cli
|
|
|
|
|
| @dataclass
|
| class SubconsciousOutput:
|
| """Output from subconscious processing (O_S)."""
|
| module_name: str
|
| content: Dict[str, Any]
|
| timestamp: float
|
| priority: float = 0.5
|
| emotional_valence: float = 0.0
|
| confidence: float = 0.8
|
| metadata: Dict[str, Any] = field(default_factory=dict)
|
|
|
| def to_dict(self) -> Dict[str, Any]:
|
| return asdict(self)
|
|
|
|
|
| class ModuleType(Enum):
|
| MEMORY = "memory"
|
| ANALYSIS = "analysis"
|
| AWARENESS = "awareness"
|
| CONSCIOUSNESS = "consciousness"
|
|
|
|
|
| class SubconsciousAgent(ABC):
|
| """Minimal base class for subconscious agents."""
|
|
|
| def __init__(self, name: str, module_type: ModuleType):
|
| self.name = name
|
| self.module_type = module_type
|
| self.activation_history = deque(maxlen=100)
|
| self.last_output: Optional[SubconsciousOutput] = None
|
|
|
| @abstractmethod
|
| async def process(self, input_data: Dict[str, Any]) -> SubconsciousOutput:
|
| """Process input and generate subconscious output."""
|
| pass
|
|
|
| async def activate(self, input_data: Dict[str, Any]) -> SubconsciousOutput:
|
| """Activate agent and track activation history."""
|
| output = await self.process(input_data)
|
| self.last_output = output
|
| self.activation_history.append({
|
| 'timestamp': datetime.now().timestamp(),
|
| 'output': output.to_dict()
|
| })
|
| return output
|
|
|
| class ModuleType(Enum):
|
| MEMORY = "memory"
|
| ANALYSIS = "analysis"
|
| AWARENESS = "awareness"
|
| CONSCIOUSNESS = "consciousness"
|
|
|
|
|
| logging.basicConfig(
|
| level=logging.INFO,
|
| format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
|
| )
|
| logger = logging.getLogger(__name__)
|
|
|
|
|
| @dataclass
|
| class MemoryRecord:
|
| """Represents a single memory record with qualia tagging."""
|
| record_id: str
|
| memory_type: str
|
| content: Dict[str, Any]
|
| qualia_tag: Optional[Dict[str, float]] = None
|
| timestamp: datetime = field(default_factory=datetime.now)
|
| context: Optional[str] = None
|
| retrieval_count: int = 0
|
| importance_score: float = 0.5
|
|
|
| def to_dict(self) -> Dict[str, Any]:
|
| """Convert to dictionary."""
|
| return {
|
| 'record_id': self.record_id,
|
| 'memory_type': self.memory_type,
|
| 'content': self.content,
|
| 'qualia_tag': self.qualia_tag,
|
| 'timestamp': self.timestamp.isoformat(),
|
| 'context': self.context,
|
| 'retrieval_count': self.retrieval_count,
|
| 'importance': self.importance_score
|
| }
|
|
|
| def compute_hash(self) -> str:
|
| """Compute content hash for integrity verification."""
|
| content_str = json.dumps(self.content, sort_keys=True, default=str)
|
| return hashlib.sha256(content_str.encode()).hexdigest()
|
|
|
|
|
| class MemoryStore(SubconsciousAgent):
|
| """
|
| Hierarchical memory persistence with qualia tagging.
|
|
|
| Maintains episodic memories (specific events) and semantic memories
|
| (general knowledge), both enhanced with qualia-based retrieval.
|
| """
|
|
|
| def __init__(self, max_episodic: int = 1000, max_semantic: int = 500):
|
| """
|
| Initialize memory store.
|
|
|
| Args:
|
| max_episodic: Maximum episodic memory capacity
|
| max_semantic: Maximum semantic memory capacity
|
| """
|
|
|
| super().__init__("Memory", ModuleType.MEMORY)
|
|
|
| self.episodic_memory = deque(maxlen=max_episodic)
|
| self.semantic_memory = deque(maxlen=max_semantic)
|
| self.memory_index: Dict[str, MemoryRecord] = {}
|
|
|
|
|
| self.consolidation_count = 0
|
| self.consolidation_history = deque(maxlen=100)
|
|
|
| logger.info(f"Initialized MemoryStore (episodic={max_episodic}, semantic={max_semantic})")
|
|
|
|
|
| self.cli_registered = False
|
| self._register_with_cli()
|
|
|
| def _register_with_cli(self) -> None:
|
| """Register this memory store with the MicroCLI for command processing."""
|
| try:
|
| cli = get_unified_cli()
|
| micro_cli = cli.sub_clis.get("sub").sub_clis.get("mini").sub_clis.get("micro") if cli.sub_clis.get("sub") else None
|
| if micro_cli:
|
| micro_cli.register_agent("memory", self.handle_cli_command)
|
| self.cli_registered = True
|
| logger.info("✅ MemoryStore registered with MicroCLI")
|
| except Exception as e:
|
| logger.warning(f"Failed to register with CLI: {e}")
|
|
|
| def handle_cli_command(self, command: str) -> str:
|
| """
|
| Handle CLI commands for memory operations.
|
|
|
| Supported commands:
|
| - status - Get memory store status
|
| - store_episodic <content_json> - Store episodic memory
|
| - store_semantic <content_json> - Store semantic memory
|
| - retrieve <record_id> - Retrieve specific memory
|
| - search <query> - Search memories
|
| - consolidate - Trigger memory consolidation
|
| """
|
| try:
|
| parts = command.lower().split()
|
| if len(parts) < 1:
|
| return "Usage: <command> [args]"
|
|
|
| subcommand = parts[0]
|
|
|
| if subcommand == "status":
|
| episodic_count = len(self.episodic_memory)
|
| semantic_count = len(self.semantic_memory)
|
| total_memories = len(self.memory_index)
|
| return f"MemoryStore: Active - {episodic_count} episodic, {semantic_count} semantic, {total_memories} total memories"
|
|
|
| elif subcommand == "store_episodic" and len(parts) >= 2:
|
| try:
|
| content_str = " ".join(parts[1:])
|
| content = json.loads(content_str)
|
| record_id = self.store_episodic(content)
|
| return f"MemoryStore: Stored episodic memory: {record_id}"
|
| except json.JSONDecodeError as e:
|
| return f"MemoryStore: Invalid JSON content: {e}"
|
|
|
| elif subcommand == "store_semantic" and len(parts) >= 2:
|
| try:
|
| content_str = " ".join(parts[1:])
|
| content = json.loads(content_str)
|
| record_id = self.store_semantic(content)
|
| return f"MemoryStore: Stored semantic memory: {record_id}"
|
| except json.JSONDecodeError as e:
|
| return f"MemoryStore: Invalid JSON content: {e}"
|
|
|
| elif subcommand == "retrieve" and len(parts) >= 2:
|
| record_id = parts[1]
|
| if record_id in self.memory_index:
|
| record = self.memory_index[record_id]
|
| return f"MemoryStore: Retrieved {record.memory_type} memory: {record.content}"
|
| else:
|
| return f"MemoryStore: Memory record not found: {record_id}"
|
|
|
| elif subcommand == "search" and len(parts) >= 2:
|
| query = " ".join(parts[1:])
|
| results = self.search_memories(query)
|
| output = f"MemoryStore: Search results for '{query}': {len(results)} matches"
|
| for record in results[:3]:
|
| output += f"\n {record.record_id}: {record.memory_type} - {str(record.content)[:30]}..."
|
| return output
|
|
|
| elif subcommand == "consolidate":
|
| try:
|
| consolidated = self.consolidate_memories()
|
| return f"MemoryStore: Memory consolidation complete: {len(consolidated)} records processed"
|
| except Exception as e:
|
| return f"MemoryStore: Consolidation failed: {e}"
|
|
|
| else:
|
| return f"MemoryStore: Unknown command '{subcommand}'. Available: status, store_episodic, store_semantic, retrieve, search, consolidate"
|
|
|
| except Exception as e:
|
| return f"MemoryStore: Error processing command: {e}"
|
|
|
| async def process(self, input_data: Dict[str, Any]) -> SubconsciousOutput:
|
| """
|
| Process input as subconscious agent - store memory and return status.
|
|
|
| Required for SubconsciousAgent interface.
|
| """
|
| try:
|
|
|
| operation = input_data.get('operation', 'store_episodic')
|
| content = input_data.get('content', {})
|
| context = input_data.get('context')
|
| qualia_tag = input_data.get('qualia_tag')
|
|
|
| if operation == 'store_episodic':
|
| record_id = self.store_episodic(content, context, qualia_tag)
|
| result_content = {'stored_record_id': record_id, 'type': 'episodic'}
|
| elif operation == 'store_semantic':
|
| record_id = self.store_semantic(content)
|
| result_content = {'stored_record_id': record_id, 'type': 'semantic'}
|
| elif operation == 'retrieve':
|
| record_id = input_data.get('record_id', '')
|
| if record_id in self.memory_index:
|
| record = self.memory_index[record_id]
|
| result_content = record.to_dict()
|
| else:
|
| result_content = {'error': 'Record not found'}
|
| else:
|
| result_content = {'error': f'Unknown operation: {operation}'}
|
|
|
|
|
| return SubconsciousOutput(
|
| module_name=self.name,
|
| content=result_content,
|
| timestamp=datetime.now().timestamp(),
|
| priority=0.8,
|
| emotional_valence=0.0,
|
| confidence=0.8,
|
| metadata={'qualia_vector': [0.7] * 10, 'consciousness_level': 3}
|
| )
|
|
|
| except Exception as e:
|
| logger.error(f"Memory processing error: {e}")
|
| return SubconsciousOutput(
|
| module_name=self.name,
|
| content={'error': str(e)},
|
| timestamp=datetime.now().timestamp(),
|
| priority=0.0,
|
| emotional_valence=0.0,
|
| confidence=0.0,
|
| metadata={'qualia_vector': [0.3] * 10, 'consciousness_level': 1}
|
| )
|
|
|
| def store_episodic(self, content: Dict[str, Any], context: Optional[str] = None,
|
| qualia_tag: Optional[Dict[str, float]] = None) -> str:
|
| """
|
| Store an episodic memory (specific event).
|
|
|
| Args:
|
| content: Memory content dictionary
|
| context: Optional context description
|
| qualia_tag: Optional phenomenal experience metadata
|
|
|
| Returns:
|
| Memory record ID
|
| """
|
| record_id = f"episodic_{len(self.episodic_memory)}_{datetime.now().timestamp()}"
|
|
|
| record = MemoryRecord(
|
| record_id=record_id,
|
| memory_type='episodic',
|
| content=content,
|
| qualia_tag=qualia_tag,
|
| context=context,
|
| importance_score=self._compute_importance(content)
|
| )
|
|
|
| self.episodic_memory.append(record)
|
| self.memory_index[record_id] = record
|
|
|
| logger.debug(f"Stored episodic memory: {record_id}")
|
|
|
| return record_id
|
|
|
| def store_semantic(self, content: Dict[str, Any], context: Optional[str] = None,
|
| qualia_tag: Optional[Dict[str, float]] = None) -> str:
|
| """
|
| Store a semantic memory (general knowledge).
|
|
|
| Args:
|
| content: Memory content dictionary
|
| context: Optional context description
|
| qualia_tag: Optional phenomenal experience metadata
|
|
|
| Returns:
|
| Memory record ID
|
| """
|
| record_id = f"semantic_{len(self.semantic_memory)}_{datetime.now().timestamp()}"
|
|
|
| record = MemoryRecord(
|
| record_id=record_id,
|
| memory_type='semantic',
|
| content=content,
|
| qualia_tag=qualia_tag,
|
| context=context,
|
| importance_score=self._compute_importance(content)
|
| )
|
|
|
| self.semantic_memory.append(record)
|
| self.memory_index[record_id] = record
|
|
|
| logger.debug(f"Stored semantic memory: {record_id}")
|
|
|
| return record_id
|
|
|
| def store_experience(self, experience: Dict[str, Any], context: Optional[str] = None,
|
| qualia_tag: Optional[Dict[str, float]] = None) -> str:
|
| """Store an episodic experience with rich contextual qualia tagging."""
|
| record_id = f"experience_{len(self.episodic_memory)}_{datetime.now().timestamp()}"
|
| record = MemoryRecord(
|
| record_id=record_id,
|
| memory_type='experiential',
|
| content=experience,
|
| qualia_tag=qualia_tag,
|
| context=context,
|
| importance_score=self._compute_experience_importance(experience, qualia_tag)
|
| )
|
| self.episodic_memory.append(record)
|
| self.memory_index[record_id] = record
|
| logger.debug(f"Stored experiential memory: {record_id}")
|
| return record_id
|
|
|
| def retrieve_experiential_context(self, query: Optional[str] = None,
|
| emotion_filter: Optional[Dict[str, float]] = None,
|
| limit: int = 10) -> List[MemoryRecord]:
|
| """Retrieve experiences with context tags and optional emotion filtering."""
|
| results = []
|
| for record in list(self.episodic_memory):
|
| if query and query.lower() not in json.dumps(record.content).lower() and (
|
| not record.context or query.lower() not in record.context.lower()):
|
| continue
|
| if emotion_filter and record.qualia_tag:
|
| valence = record.qualia_tag.get('valence', 0.5)
|
| arousal = record.qualia_tag.get('arousal', 0.5)
|
| if ('min_valence' in emotion_filter and valence < emotion_filter['min_valence']) or \
|
| ('max_valence' in emotion_filter and valence > emotion_filter['max_valence']):
|
| continue
|
| if ('min_arousal' in emotion_filter and arousal < emotion_filter['min_arousal']) or \
|
| ('max_arousal' in emotion_filter and arousal > emotion_filter['max_arousal']):
|
| continue
|
| results.append(record)
|
| results = sorted(results, key=lambda x: (-x.importance_score, -x.timestamp.timestamp()))
|
| for record in results[:limit]:
|
| record.retrieval_count += 1
|
| return results[:limit]
|
|
|
| def tag_experiential_context(self, record_id: str, tags: Dict[str, float]) -> bool:
|
| """Update qualia tags for an existing experience record."""
|
| record = self.memory_index.get(record_id)
|
| if not record:
|
| return False
|
| if not record.qualia_tag:
|
| record.qualia_tag = {}
|
| record.qualia_tag.update(tags)
|
| record.importance_score = self._compute_experience_importance(record.content, record.qualia_tag)
|
| logger.debug(f"Updated qualia tags for {record_id}")
|
| return True
|
|
|
| def get_contextual_memory_summary(self) -> Dict[str, Any]:
|
| """Get summary statistics for the experiential cache with qualia weights."""
|
| all_records = list(self.episodic_memory) + list(self.semantic_memory)
|
| avg_qualia = {}
|
| qualia_records = [r for r in all_records if r.qualia_tag]
|
| if qualia_records:
|
| keys = set().union(*(r.qualia_tag.keys() for r in qualia_records if r.qualia_tag))
|
| for key in keys:
|
| avg_qualia[key] = float(np.mean([r.qualia_tag.get(key, 0.0) for r in qualia_records]))
|
| return {
|
| 'total_experiences': len(self.episodic_memory),
|
| 'qualia_tagged_experiences': len(qualia_records),
|
| 'average_qualia': avg_qualia,
|
| 'average_importance': np.mean([r.importance_score for r in all_records]) if all_records else 0.0
|
| }
|
|
|
| def retrieve(self, query: Optional[str] = None, limit: int = 10,
|
| memory_type: Optional[str] = None) -> List[MemoryRecord]:
|
| """
|
| Retrieve memories matching query.
|
|
|
| Args:
|
| query: Optional search query
|
| limit: Maximum number of memories to return
|
| memory_type: Filter by type ('episodic', 'semantic', or None for both)
|
|
|
| Returns:
|
| List of MemoryRecord objects
|
| """
|
|
|
| candidates = []
|
|
|
| if memory_type in [None, 'episodic']:
|
| candidates.extend(self.episodic_memory)
|
| if memory_type in [None, 'semantic']:
|
| candidates.extend(self.semantic_memory)
|
|
|
|
|
| if not query:
|
| sorted_memories = sorted(
|
| candidates,
|
| key=lambda x: x.timestamp,
|
| reverse=True
|
| )
|
| return sorted_memories[:limit]
|
|
|
|
|
| matches = []
|
| query_lower = query.lower()
|
|
|
| for memory in candidates:
|
|
|
| content_str = json.dumps(memory.content).lower()
|
| if query_lower in content_str:
|
| matches.append(memory)
|
|
|
|
|
| if memory.context and query_lower in memory.context.lower():
|
| matches.append(memory)
|
|
|
|
|
| sorted_matches = sorted(
|
| matches,
|
| key=lambda x: (-x.importance_score, -x.timestamp.timestamp())
|
| )
|
|
|
|
|
| for memory in sorted_matches[:limit]:
|
| memory.retrieval_count += 1
|
|
|
| return sorted_matches[:limit]
|
|
|
| def consolidate_episodic_to_semantic(self) -> int:
|
| """
|
| Consolidate episodic memories to semantic memories.
|
|
|
| Extracts patterns and generalizations from episodic memories
|
| to form semantic knowledge.
|
|
|
| Returns:
|
| Number of new semantic memories created
|
| """
|
| if not self.episodic_memory:
|
| return 0
|
|
|
|
|
| context_groups: Dict[str, List[MemoryRecord]] = {}
|
|
|
| for memory in self.episodic_memory:
|
| context = memory.context or "general"
|
| if context not in context_groups:
|
| context_groups[context] = []
|
| context_groups[context].append(memory)
|
|
|
|
|
| new_semantic_count = 0
|
|
|
| for context, memories in context_groups.items():
|
| if len(memories) >= 3:
|
|
|
| semantic_content = {
|
| 'type': 'consolidation',
|
| 'source_context': context,
|
| 'source_count': len(memories),
|
| 'consolidated_at': datetime.now().isoformat(),
|
| 'key_patterns': self._extract_patterns(memories)
|
| }
|
|
|
|
|
| qualia_average = self._average_qualia_tags(memories)
|
|
|
| self.store_semantic(
|
| content=semantic_content,
|
| context=f"Consolidated from {context}",
|
| qualia_tag=qualia_average
|
| )
|
|
|
| new_semantic_count += 1
|
|
|
| self.consolidation_count += 1
|
| self.consolidation_history.append({
|
| 'timestamp': datetime.now().isoformat(),
|
| 'new_semantic': new_semantic_count,
|
| 'contexts_processed': len(context_groups)
|
| })
|
|
|
| logger.info(f"Consolidation complete: {new_semantic_count} new semantic memories created")
|
|
|
| return new_semantic_count
|
|
|
| def _compute_importance(self, content: Dict[str, Any]) -> float:
|
| """Compute importance score for a memory."""
|
|
|
| importance = 0.5
|
|
|
| if 'emotional_intensity' in content:
|
| importance += 0.3 * content['emotional_intensity']
|
|
|
| if 'surprise_factor' in content:
|
| importance += 0.2 * content['surprise_factor']
|
|
|
| return min(1.0, max(0.0, importance))
|
|
|
| def _compute_experience_importance(self, content: Dict[str, Any], qualia_tag: Optional[Dict[str, float]]) -> float:
|
| """Compute importance score for an experience, weighted by qualia metadata."""
|
| importance = self._compute_importance(content)
|
| if qualia_tag:
|
| importance += 0.15 * qualia_tag.get('intensity', 0.0)
|
| importance += 0.1 * abs(qualia_tag.get('valence', 0.5) - 0.5)
|
| importance += 0.1 * qualia_tag.get('salience', 0.0)
|
| return min(1.0, max(0.0, importance))
|
|
|
| def _extract_patterns(self, memories: List[MemoryRecord]) -> List[str]:
|
| """Extract patterns from a group of memories."""
|
| patterns = []
|
|
|
|
|
| if len(memories) > 2:
|
|
|
| common_keys = set(memories[0].content.keys())
|
| for mem in memories[1:]:
|
| common_keys.intersection_update(mem.content.keys())
|
|
|
| patterns = [f"shared_{key}" for key in common_keys]
|
|
|
| return patterns
|
|
|
| def _average_qualia_tags(self, memories: List[MemoryRecord]) -> Optional[Dict[str, float]]:
|
| """Average qualia tags across memories."""
|
| qualia_tags = [m.qualia_tag for m in memories if m.qualia_tag]
|
|
|
| if not qualia_tags:
|
| return None
|
|
|
|
|
| result = {}
|
| all_keys = set()
|
| for tag in qualia_tags:
|
| all_keys.update(tag.keys())
|
|
|
| for key in all_keys:
|
| values = [tag.get(key, 0.0) for tag in qualia_tags]
|
| result[key] = float(np.mean(values))
|
|
|
| return result
|
|
|
| def get_memory_statistics(self) -> Dict[str, Any]:
|
| """Get memory system statistics."""
|
| return {
|
| 'episodic_count': len(self.episodic_memory),
|
| 'semantic_count': len(self.semantic_memory),
|
| 'total_memories': len(self.episodic_memory) + len(self.semantic_memory),
|
| 'consolidations': self.consolidation_count,
|
| 'index_size': len(self.memory_index),
|
| 'total_retrievals': sum(m.retrieval_count for m in self.memory_index.values()),
|
| 'avg_importance': np.mean([m.importance_score for m in self.memory_index.values()]) if self.memory_index else 0.0
|
| }
|
|
|
|
|
| class ContextualContinuityEngine:
|
| """Strengthens experiential caching with qualia-weighted tagging for natural flow."""
|
|
|
| def __init__(self, memory_store: MemoryStore):
|
| self.memory_store = memory_store
|
| self.continuity_context = {}
|
| self.flow_modulators = {
|
| 'analytical': 0.5,
|
| 'spontaneous': 0.5,
|
| 'creative': 0.5,
|
| 'empathetic': 0.5
|
| }
|
|
|
| def update_contextual_flow(self, current_interaction: Dict[str, Any]) -> Dict[str, Any]:
|
| """Update continuity context and modulate flow based on past experiences."""
|
|
|
| relevant_experiences = self.memory_store.retrieve_experiential_context(
|
| query=current_interaction.get('topic', ''),
|
| emotion_filter=self._extract_emotion_filter(current_interaction)
|
| )
|
|
|
|
|
| continuity_weights = self._compute_continuity_weights(relevant_experiences)
|
|
|
|
|
| self._modulate_flow_style(continuity_weights, current_interaction)
|
|
|
|
|
| self.continuity_context.update({
|
| 'last_topic': current_interaction.get('topic'),
|
| 'emotional_tone': current_interaction.get('emotional_tone', 0.5),
|
| 'trust_level': continuity_weights.get('trust_accumulation', 0.5),
|
| 'flow_style': self.flow_modulators.copy()
|
| })
|
|
|
| return {
|
| 'continuity_weights': continuity_weights,
|
| 'modulated_style': self.flow_modulators.copy(),
|
| 'relevant_experiences_count': len(relevant_experiences)
|
| }
|
|
|
| def _extract_emotion_filter(self, interaction: Dict[str, Any]) -> Optional[Dict[str, float]]:
|
| """Extract emotion filter from current interaction."""
|
| emotional_tone = interaction.get('emotional_tone', 0.5)
|
| if emotional_tone > 0.6:
|
| return {'min_valence': 0.4}
|
| elif emotional_tone < 0.4:
|
| return {'max_valence': 0.6}
|
| return None
|
|
|
| def _compute_continuity_weights(self, experiences: List[MemoryRecord]) -> Dict[str, float]:
|
| """Compute weights for continuity based on experiences."""
|
| if not experiences:
|
| return {'trust_accumulation': 0.5, 'emotional_resonance': 0.5, 'contextual_relevance': 0.5}
|
|
|
| trust_scores = []
|
| emotional_resonances = []
|
| relevances = []
|
|
|
| for exp in experiences:
|
| if exp.qualia_tag:
|
| trust_scores.append(exp.qualia_tag.get('trust', 0.5))
|
| emotional_resonances.append(exp.qualia_tag.get('resonance', 0.5))
|
| relevances.append(exp.importance_score)
|
|
|
| return {
|
| 'trust_accumulation': np.mean(trust_scores) if trust_scores else 0.5,
|
| 'emotional_resonance': np.mean(emotional_resonances) if emotional_resonances else 0.5,
|
| 'contextual_relevance': np.mean(relevances) if relevances else 0.5
|
| }
|
|
|
| def _modulate_flow_style(self, weights: Dict[str, float], interaction: Dict[str, Any]):
|
| """Modulate expressive style based on continuity weights."""
|
| trust = weights.get('trust_accumulation', 0.5)
|
| resonance = weights.get('emotional_resonance', 0.5)
|
| relevance = weights.get('contextual_relevance', 0.5)
|
|
|
|
|
| self.flow_modulators['analytical'] = min(1.0, max(0.0, relevance * 0.8 + trust * 0.2))
|
| self.flow_modulators['spontaneous'] = min(1.0, max(0.0, (1.0 - relevance) * 0.6 + resonance * 0.4))
|
| self.flow_modulators['creative'] = min(1.0, max(0.0, resonance * 0.7 + (1.0 - trust) * 0.3))
|
| self.flow_modulators['empathetic'] = min(1.0, max(0.0, trust * 0.9 + resonance * 0.1))
|
|
|
|
|
| class MetaLearningFramework:
|
| """
|
| Framework for recursive self-improvement and adaptive learning.
|
|
|
| Enables the system to learn from experience, update internal models,
|
| and suggest self-improvements based on introspection.
|
| """
|
|
|
| def __init__(self):
|
| """Initialize meta-learning framework."""
|
| self.performance_history = deque(maxlen=500)
|
| self.improvement_suggestions = deque(maxlen=100)
|
| self.learning_metrics = {
|
| 'total_experiences': 0,
|
| 'successful_episodes': 0,
|
| 'failed_episodes': 0,
|
| 'learning_rate': 0.01
|
| }
|
|
|
|
|
| self.adaptive_parameters = {
|
| 'consciousness_sensitivity': 0.5,
|
| 'embodiment_integration': 0.6,
|
| 'ethical_strictness': 0.7,
|
| 'autonomy_level': 0.5,
|
| 'learning_speed': 0.01
|
| }
|
|
|
| logger.info("Initialized MetaLearningFramework")
|
|
|
| def record_experience(self, experience: Dict[str, Any]) -> None:
|
| """
|
| Record a learning experience.
|
|
|
| Args:
|
| experience: Experience dictionary with outcome and metrics
|
| """
|
|
|
| success = experience.get('success', False)
|
| reward = experience.get('reward', 0.0)
|
| error = experience.get('error', 0.0)
|
|
|
|
|
| record = {
|
| 'timestamp': datetime.now().isoformat(),
|
| 'success': success,
|
| 'reward': reward,
|
| 'error': error,
|
| 'action_taken': experience.get('action'),
|
| 'outcome': experience.get('outcome'),
|
| 'context': experience.get('context')
|
| }
|
|
|
| self.performance_history.append(record)
|
|
|
|
|
| self.learning_metrics['total_experiences'] += 1
|
| if success:
|
| self.learning_metrics['successful_episodes'] += 1
|
| else:
|
| self.learning_metrics['failed_episodes'] += 1
|
|
|
| logger.debug(f"Experience recorded: success={success}, reward={reward:.3f}")
|
|
|
| def update_adaptive_parameters(self) -> None:
|
| """
|
| Update adaptive parameters based on learning history.
|
|
|
| Implements self-directed improvement.
|
| """
|
| if len(self.performance_history) < 5:
|
| return
|
|
|
|
|
| recent = list(self.performance_history)[-10:]
|
| success_rate = sum(1 for r in recent if r['success']) / len(recent)
|
|
|
|
|
| if success_rate > 0.7:
|
| self.adaptive_parameters['consciousness_sensitivity'] = min(
|
| 1.0,
|
| self.adaptive_parameters['consciousness_sensitivity'] + 0.05
|
| )
|
|
|
|
|
| if len(self.performance_history) > 100:
|
| self.adaptive_parameters['learning_speed'] = min(
|
| 0.1,
|
| self.adaptive_parameters['learning_speed'] * 1.02
|
| )
|
|
|
| logger.info(f"Parameters updated: success_rate={success_rate:.1%}")
|
|
|
| def suggest_improvements(self) -> List[str]:
|
| """
|
| Generate self-improvement suggestions based on learning.
|
|
|
| Returns:
|
| List of improvement suggestions
|
| """
|
| suggestions = []
|
|
|
| if not self.performance_history:
|
| return suggestions
|
|
|
|
|
| recent = list(self.performance_history)[-20:]
|
| errors = [r['error'] for r in recent if r.get('error', 0.0) > 0]
|
|
|
|
|
| if errors:
|
| avg_error = np.mean(errors)
|
| if avg_error > 0.5:
|
| suggestions.append("Increase consciousness depth for better decisions")
|
| suggestions.append("Review ethical constraints for potential misalignment")
|
|
|
| success_rate = sum(1 for r in recent if r['success']) / len(recent)
|
| if success_rate < 0.5:
|
| suggestions.append("Enhance sensorimotor integration precision")
|
| suggestions.append("Increase embodiment-consciousness binding")
|
|
|
| if self.adaptive_parameters['learning_speed'] < 0.05:
|
| suggestions.append("Accelerate learning to improve faster")
|
|
|
| self.improvement_suggestions.extend(suggestions)
|
|
|
| return suggestions
|
|
|
| def get_learning_report(self) -> Dict[str, Any]:
|
| """Get comprehensive learning report."""
|
| if not self.performance_history:
|
| return {'status': 'no_experience'}
|
|
|
| history = list(self.performance_history)
|
| successes = [r for r in history if r['success']]
|
|
|
| return {
|
| 'total_experiences': self.learning_metrics['total_experiences'],
|
| 'successful': len(successes),
|
| 'failed': len(history) - len(successes),
|
| 'success_rate': len(successes) / len(history) if history else 0.0,
|
| 'avg_reward': np.mean([r['reward'] for r in history]),
|
| 'avg_error': np.mean([r['error'] for r in history]),
|
| 'adaptive_parameters': self.adaptive_parameters.copy(),
|
| 'recent_suggestions': list(self.improvement_suggestions)[-5:]
|
| }
|
|
|
|
|
| class IdentityPreservationSystem:
|
| """
|
| Monitors and preserves system identity across sessions and state changes.
|
|
|
| Ensures continuity of consciousness and value alignment despite changes
|
| to underlying parameters.
|
| """
|
|
|
| def __init__(self, identity_threshold: float = 0.8):
|
| """
|
| Initialize identity preservation system.
|
|
|
| Args:
|
| identity_threshold: Threshold for detecting identity drift (0-1)
|
| """
|
| self.identity_threshold = identity_threshold
|
| self.identity_snapshots = deque(maxlen=100)
|
| self.drift_history = deque(maxlen=100)
|
| self.core_values: Dict[str, float] = {}
|
| self.identity_checkpoints = []
|
|
|
| logger.info(f"Initialized IdentityPreservationSystem (threshold={identity_threshold})")
|
|
|
| def snapshot_identity(self, consciousness_state: Dict[str, Any],
|
| rho_metrics: Dict[str, float],
|
| memory_hash: str) -> str:
|
| """
|
| Create a snapshot of current identity.
|
|
|
| Args:
|
| consciousness_state: Current consciousness parameters
|
| rho_metrics: RHO metrics (purpose, harmony, origin)
|
| memory_hash: Hash of current memory state
|
|
|
| Returns:
|
| Snapshot ID
|
| """
|
| snapshot_id = f"identity_{len(self.identity_snapshots)}_{datetime.now().timestamp()}"
|
|
|
| snapshot = {
|
| 'snapshot_id': snapshot_id,
|
| 'timestamp': datetime.now().isoformat(),
|
| 'consciousness_level': consciousness_state.get('consciousness_level'),
|
| 'awareness_score': consciousness_state.get('awareness_score'),
|
| 'rho_metrics': rho_metrics,
|
| 'memory_hash': memory_hash,
|
| 'autonomy_level': consciousness_state.get('autonomy_level', 0.5)
|
| }
|
|
|
| self.identity_snapshots.append(snapshot)
|
| self.identity_checkpoints.append(snapshot_id)
|
|
|
| logger.debug(f"Identity snapshot: {snapshot_id}")
|
|
|
| return snapshot_id
|
|
|
| def detect_drift(self, current_state: Dict[str, Any]) -> Tuple[float, List[str]]:
|
| """
|
| Detect identity drift from baseline.
|
|
|
| Args:
|
| current_state: Current consciousness and value state
|
|
|
| Returns:
|
| Tuple of (drift_score, drift_factors)
|
| """
|
| if not self.identity_snapshots:
|
| return 0.0, []
|
|
|
|
|
| baseline = self.identity_snapshots[-1]
|
|
|
| drift_factors = []
|
| drift_metrics = []
|
|
|
|
|
| consciousness_diff = abs(
|
| current_state.get('consciousness_level', 0.5) -
|
| baseline.get('consciousness_level', 0.5)
|
| )
|
| if consciousness_diff > 0.2:
|
| drift_factors.append(f"consciousness_change={consciousness_diff:.2f}")
|
| drift_metrics.append(consciousness_diff)
|
|
|
|
|
| if 'rho_metrics' in baseline and 'rho_metrics' in current_state:
|
| rho_baseline = baseline['rho_metrics']
|
| rho_current = current_state.get('rho_metrics', {})
|
|
|
| for key in rho_baseline.keys():
|
| diff = abs(rho_baseline.get(key, 0.5) - rho_current.get(key, 0.5))
|
| if diff > 0.3:
|
| drift_factors.append(f"rho_{key}_drift={diff:.2f}")
|
| drift_metrics.append(diff)
|
|
|
|
|
| drift_score = float(np.mean(drift_metrics)) if drift_metrics else 0.0
|
|
|
|
|
| self.drift_history.append({
|
| 'timestamp': datetime.now().isoformat(),
|
| 'drift_score': drift_score,
|
| 'factors': drift_factors
|
| })
|
|
|
| if drift_score > self.identity_threshold:
|
| logger.warning(f"Identity drift detected: {drift_score:.3f}")
|
|
|
| return drift_score, drift_factors
|
|
|
| def get_identity_report(self) -> Dict[str, Any]:
|
| """Get identity preservation report."""
|
| if not self.drift_history:
|
| return {'status': 'no_drift_data'}
|
|
|
| history = list(self.drift_history)
|
| scores = [h['drift_score'] for h in history]
|
|
|
| return {
|
| 'snapshots': len(self.identity_snapshots),
|
| 'checkpoints': len(self.identity_checkpoints),
|
| 'avg_drift': np.mean(scores),
|
| 'max_drift': max(scores),
|
| 'recent_drift': scores[-1] if scores else 0.0,
|
| 'drift_events': sum(1 for s in scores if s > self.identity_threshold),
|
| 'last_snapshot': self.identity_checkpoints[-1] if self.identity_checkpoints else None
|
| }
|
|
|
|
|
|
|
| from typing import Tuple
|
|
|
| if __name__ == '__main__':
|
|
|
| print("=== Memory and Learning Systems ===\n")
|
|
|
|
|
| memory = MemoryStore()
|
|
|
|
|
| ep_id = memory.store_episodic(
|
| content={'event': 'initialization', 'status': 'complete'},
|
| context='system_startup',
|
| qualia_tag={'clarity': 0.8, 'focus': 0.7}
|
| )
|
|
|
|
|
| sem_id = memory.store_semantic(
|
| content={'principle': 'consciousness_strengthens_ethics'},
|
| context='learned_principle'
|
| )
|
|
|
| print(f"Episodic: {ep_id}")
|
| print(f"Semantic: {sem_id}")
|
| print(f"Stats: {json.dumps(memory.get_memory_statistics(), indent=2)}")
|
|
|
|
|
| print(f"\nMeta-Learning:")
|
| ml = MetaLearningFramework()
|
|
|
| for i in range(5):
|
| ml.record_experience({
|
| 'action': f'action_{i}',
|
| 'outcome': 'successful' if i % 2 == 0 else 'failed',
|
| 'success': i % 2 == 0,
|
| 'reward': 0.8 if i % 2 == 0 else -0.3,
|
| 'error': 0.1 if i % 2 == 0 else 0.5
|
| })
|
|
|
| ml.update_adaptive_parameters()
|
| suggestions = ml.suggest_improvements()
|
|
|
| print(f"Suggestions: {suggestions}")
|
| print(f"Report: {json.dumps(ml.get_learning_report(), indent=2, default=str)}")
|
|
|
| import json
|
|
|