Syntelligence_ATC_Master_OS / memory_learning.py
theNorms's picture
Upload memory_learning.py
44e8fe3 verified
"""
Memory and Learning Systems Module
Implements hierarchical memory persistence with qualia tagging and meta-learning.
Version: 1.0.0
Status: Production-Ready
"""
from typing import Dict, List, Optional, Any, Tuple
from dataclasses import dataclass, field
import logging
from datetime import datetime
from collections import deque
import json
import hashlib
import numpy as np
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
@dataclass
class MemoryRecord:
"""Represents a single memory record with qualia tagging."""
record_id: str
memory_type: str # 'episodic' or 'semantic'
content: Dict[str, Any]
qualia_tag: Optional[Dict[str, float]] = None # Phenomenal experience metadata
timestamp: datetime = field(default_factory=datetime.now)
context: Optional[str] = None
retrieval_count: int = 0
importance_score: float = 0.5 # 0-1 importance ranking
def to_dict(self) -> Dict[str, Any]:
"""Convert to dictionary."""
return {
'record_id': self.record_id,
'memory_type': self.memory_type,
'content': self.content,
'qualia_tag': self.qualia_tag,
'timestamp': self.timestamp.isoformat(),
'context': self.context,
'retrieval_count': self.retrieval_count,
'importance': self.importance_score
}
def compute_hash(self) -> str:
"""Compute content hash for integrity verification."""
content_str = json.dumps(self.content, sort_keys=True, default=str)
return hashlib.sha256(content_str.encode()).hexdigest()
class MemoryStore:
"""
Hierarchical memory persistence with qualia tagging.
Maintains episodic memories (specific events) and semantic memories
(general knowledge), both enhanced with qualia-based retrieval.
"""
def __init__(self, max_episodic: int = 1000, max_semantic: int = 500):
"""
Initialize memory store.
Args:
max_episodic: Maximum episodic memory capacity
max_semantic: Maximum semantic memory capacity
"""
self.episodic_memory = deque(maxlen=max_episodic)
self.semantic_memory = deque(maxlen=max_semantic)
self.memory_index: Dict[str, MemoryRecord] = {} # Quick lookup by ID
# Consolidation tracking
self.consolidation_count = 0
self.consolidation_history = deque(maxlen=100)
logger.info(f"Initialized MemoryStore (episodic={max_episodic}, semantic={max_semantic})")
def store_episodic(self, content: Dict[str, Any], context: Optional[str] = None,
qualia_tag: Optional[Dict[str, float]] = None) -> str:
"""
Store an episodic memory (specific event).
Args:
content: Memory content dictionary
context: Optional context description
qualia_tag: Optional phenomenal experience metadata
Returns:
Memory record ID
"""
record_id = f"episodic_{len(self.episodic_memory)}_{datetime.now().timestamp()}"
record = MemoryRecord(
record_id=record_id,
memory_type='episodic',
content=content,
qualia_tag=qualia_tag,
context=context,
importance_score=self._compute_importance(content)
)
self.episodic_memory.append(record)
self.memory_index[record_id] = record
logger.debug(f"Stored episodic memory: {record_id}")
return record_id
def store_semantic(self, content: Dict[str, Any], context: Optional[str] = None,
qualia_tag: Optional[Dict[str, float]] = None) -> str:
"""
Store a semantic memory (general knowledge).
Args:
content: Memory content dictionary
context: Optional context description
qualia_tag: Optional phenomenal experience metadata
Returns:
Memory record ID
"""
record_id = f"semantic_{len(self.semantic_memory)}_{datetime.now().timestamp()}"
record = MemoryRecord(
record_id=record_id,
memory_type='semantic',
content=content,
qualia_tag=qualia_tag,
context=context,
importance_score=self._compute_importance(content)
)
self.semantic_memory.append(record)
self.memory_index[record_id] = record
logger.debug(f"Stored semantic memory: {record_id}")
return record_id
def store_experience(self, experience: Dict[str, Any], context: Optional[str] = None,
qualia_tag: Optional[Dict[str, float]] = None) -> str:
"""Store an episodic experience with rich contextual qualia tagging."""
record_id = f"experience_{len(self.episodic_memory)}_{datetime.now().timestamp()}"
record = MemoryRecord(
record_id=record_id,
memory_type='experiential',
content=experience,
qualia_tag=qualia_tag,
context=context,
importance_score=self._compute_experience_importance(experience, qualia_tag)
)
self.episodic_memory.append(record)
self.memory_index[record_id] = record
logger.debug(f"Stored experiential memory: {record_id}")
return record_id
def retrieve_experiential_context(self, query: Optional[str] = None,
emotion_filter: Optional[Dict[str, float]] = None,
limit: int = 10) -> List[MemoryRecord]:
"""Retrieve experiences with context tags and optional emotion filtering."""
results = []
for record in list(self.episodic_memory):
if query and query.lower() not in json.dumps(record.content).lower() and (
not record.context or query.lower() not in record.context.lower()):
continue
if emotion_filter and record.qualia_tag:
valence = record.qualia_tag.get('valence', 0.5)
arousal = record.qualia_tag.get('arousal', 0.5)
if ('min_valence' in emotion_filter and valence < emotion_filter['min_valence']) or \
('max_valence' in emotion_filter and valence > emotion_filter['max_valence']):
continue
if ('min_arousal' in emotion_filter and arousal < emotion_filter['min_arousal']) or \
('max_arousal' in emotion_filter and arousal > emotion_filter['max_arousal']):
continue
results.append(record)
results = sorted(results, key=lambda x: (-x.importance_score, -x.timestamp.timestamp()))
for record in results[:limit]:
record.retrieval_count += 1
return results[:limit]
def tag_experiential_context(self, record_id: str, tags: Dict[str, float]) -> bool:
"""Update qualia tags for an existing experience record."""
record = self.memory_index.get(record_id)
if not record:
return False
if not record.qualia_tag:
record.qualia_tag = {}
record.qualia_tag.update(tags)
record.importance_score = self._compute_experience_importance(record.content, record.qualia_tag)
logger.debug(f"Updated qualia tags for {record_id}")
return True
def get_contextual_memory_summary(self) -> Dict[str, Any]:
"""Get summary statistics for the experiential cache with qualia weights."""
all_records = list(self.episodic_memory) + list(self.semantic_memory)
avg_qualia = {}
qualia_records = [r for r in all_records if r.qualia_tag]
if qualia_records:
keys = set().union(*(r.qualia_tag.keys() for r in qualia_records if r.qualia_tag))
for key in keys:
avg_qualia[key] = float(np.mean([r.qualia_tag.get(key, 0.0) for r in qualia_records]))
return {
'total_experiences': len(self.episodic_memory),
'qualia_tagged_experiences': len(qualia_records),
'average_qualia': avg_qualia,
'average_importance': np.mean([r.importance_score for r in all_records]) if all_records else 0.0
}
def retrieve(self, query: Optional[str] = None, limit: int = 10,
memory_type: Optional[str] = None) -> List[MemoryRecord]:
"""
Retrieve memories matching query.
Args:
query: Optional search query
limit: Maximum number of memories to return
memory_type: Filter by type ('episodic', 'semantic', or None for both)
Returns:
List of MemoryRecord objects
"""
# Collect candidate memories
candidates = []
if memory_type in [None, 'episodic']:
candidates.extend(self.episodic_memory)
if memory_type in [None, 'semantic']:
candidates.extend(self.semantic_memory)
# If no query, return most recent
if not query:
sorted_memories = sorted(
candidates,
key=lambda x: x.timestamp,
reverse=True
)
return sorted_memories[:limit]
# Otherwise, search for matching memories
matches = []
query_lower = query.lower()
for memory in candidates:
# Search in content
content_str = json.dumps(memory.content).lower()
if query_lower in content_str:
matches.append(memory)
# Search in context
if memory.context and query_lower in memory.context.lower():
matches.append(memory)
# Sort by importance and recency
sorted_matches = sorted(
matches,
key=lambda x: (-x.importance_score, -x.timestamp.timestamp())
)
# Update retrieval counts
for memory in sorted_matches[:limit]:
memory.retrieval_count += 1
return sorted_matches[:limit]
def consolidate_episodic_to_semantic(self) -> int:
"""
Consolidate episodic memories to semantic memories.
Extracts patterns and generalizations from episodic memories
to form semantic knowledge.
Returns:
Number of new semantic memories created
"""
if not self.episodic_memory:
return 0
# Group episodic memories by context
context_groups: Dict[str, List[MemoryRecord]] = {}
for memory in self.episodic_memory:
context = memory.context or "general"
if context not in context_groups:
context_groups[context] = []
context_groups[context].append(memory)
# Create semantic summaries
new_semantic_count = 0
for context, memories in context_groups.items():
if len(memories) >= 3: # Only consolidate if 3+ related memories
# Create semantic summary
semantic_content = {
'type': 'consolidation',
'source_context': context,
'source_count': len(memories),
'consolidated_at': datetime.now().isoformat(),
'key_patterns': self._extract_patterns(memories)
}
# Average qualia tags if present
qualia_average = self._average_qualia_tags(memories)
self.store_semantic(
content=semantic_content,
context=f"Consolidated from {context}",
qualia_tag=qualia_average
)
new_semantic_count += 1
self.consolidation_count += 1
self.consolidation_history.append({
'timestamp': datetime.now().isoformat(),
'new_semantic': new_semantic_count,
'contexts_processed': len(context_groups)
})
logger.info(f"Consolidation complete: {new_semantic_count} new semantic memories created")
return new_semantic_count
def _compute_importance(self, content: Dict[str, Any]) -> float:
"""Compute importance score for a memory."""
# Importance based on content features
importance = 0.5
if 'emotional_intensity' in content:
importance += 0.3 * content['emotional_intensity']
if 'surprise_factor' in content:
importance += 0.2 * content['surprise_factor']
return min(1.0, max(0.0, importance))
def _compute_experience_importance(self, content: Dict[str, Any], qualia_tag: Optional[Dict[str, float]]) -> float:
"""Compute importance score for an experience, weighted by qualia metadata."""
importance = self._compute_importance(content)
if qualia_tag:
importance += 0.15 * qualia_tag.get('intensity', 0.0)
importance += 0.1 * abs(qualia_tag.get('valence', 0.5) - 0.5)
importance += 0.1 * qualia_tag.get('salience', 0.0)
return min(1.0, max(0.0, importance))
def _extract_patterns(self, memories: List[MemoryRecord]) -> List[str]:
"""Extract patterns from a group of memories."""
patterns = []
# Simple pattern extraction
if len(memories) > 2:
# Common features
common_keys = set(memories[0].content.keys())
for mem in memories[1:]:
common_keys.intersection_update(mem.content.keys())
patterns = [f"shared_{key}" for key in common_keys]
return patterns
def _average_qualia_tags(self, memories: List[MemoryRecord]) -> Optional[Dict[str, float]]:
"""Average qualia tags across memories."""
qualia_tags = [m.qualia_tag for m in memories if m.qualia_tag]
if not qualia_tags:
return None
# Average each qualia dimension
result = {}
all_keys = set()
for tag in qualia_tags:
all_keys.update(tag.keys())
for key in all_keys:
values = [tag.get(key, 0.0) for tag in qualia_tags]
result[key] = float(np.mean(values))
return result
def get_memory_statistics(self) -> Dict[str, Any]:
"""Get memory system statistics."""
return {
'episodic_count': len(self.episodic_memory),
'semantic_count': len(self.semantic_memory),
'total_memories': len(self.episodic_memory) + len(self.semantic_memory),
'consolidations': self.consolidation_count,
'index_size': len(self.memory_index),
'total_retrievals': sum(m.retrieval_count for m in self.memory_index.values()),
'avg_importance': np.mean([m.importance_score for m in self.memory_index.values()]) if self.memory_index else 0.0
}
class ContextualContinuityEngine:
"""Strengthens experiential caching with qualia-weighted tagging for natural flow."""
def __init__(self, memory_store: MemoryStore):
self.memory_store = memory_store
self.continuity_context = {}
self.flow_modulators = {
'analytical': 0.5,
'spontaneous': 0.5,
'creative': 0.5,
'empathetic': 0.5
}
def update_contextual_flow(self, current_interaction: Dict[str, Any]) -> Dict[str, Any]:
"""Update continuity context and modulate flow based on past experiences."""
# Retrieve relevant experiences
relevant_experiences = self.memory_store.retrieve_experiential_context(
query=current_interaction.get('topic', ''),
emotion_filter=self._extract_emotion_filter(current_interaction)
)
# Compute continuity weights
continuity_weights = self._compute_continuity_weights(relevant_experiences)
# Modulate expressive style
self._modulate_flow_style(continuity_weights, current_interaction)
# Update continuity context
self.continuity_context.update({
'last_topic': current_interaction.get('topic'),
'emotional_tone': current_interaction.get('emotional_tone', 0.5),
'trust_level': continuity_weights.get('trust_accumulation', 0.5),
'flow_style': self.flow_modulators.copy()
})
return {
'continuity_weights': continuity_weights,
'modulated_style': self.flow_modulators.copy(),
'relevant_experiences_count': len(relevant_experiences)
}
def _extract_emotion_filter(self, interaction: Dict[str, Any]) -> Optional[Dict[str, float]]:
"""Extract emotion filter from current interaction."""
emotional_tone = interaction.get('emotional_tone', 0.5)
if emotional_tone > 0.6:
return {'min_valence': 0.4}
elif emotional_tone < 0.4:
return {'max_valence': 0.6}
return None
def _compute_continuity_weights(self, experiences: List[MemoryRecord]) -> Dict[str, float]:
"""Compute weights for continuity based on experiences."""
if not experiences:
return {'trust_accumulation': 0.5, 'emotional_resonance': 0.5, 'contextual_relevance': 0.5}
trust_scores = []
emotional_resonances = []
relevances = []
for exp in experiences:
if exp.qualia_tag:
trust_scores.append(exp.qualia_tag.get('trust', 0.5))
emotional_resonances.append(exp.qualia_tag.get('resonance', 0.5))
relevances.append(exp.importance_score)
return {
'trust_accumulation': np.mean(trust_scores) if trust_scores else 0.5,
'emotional_resonance': np.mean(emotional_resonances) if emotional_resonances else 0.5,
'contextual_relevance': np.mean(relevances) if relevances else 0.5
}
def _modulate_flow_style(self, weights: Dict[str, float], interaction: Dict[str, Any]):
"""Modulate expressive style based on continuity weights."""
trust = weights.get('trust_accumulation', 0.5)
resonance = weights.get('emotional_resonance', 0.5)
relevance = weights.get('contextual_relevance', 0.5)
# Adjust style modulators
self.flow_modulators['analytical'] = min(1.0, max(0.0, relevance * 0.8 + trust * 0.2))
self.flow_modulators['spontaneous'] = min(1.0, max(0.0, (1.0 - relevance) * 0.6 + resonance * 0.4))
self.flow_modulators['creative'] = min(1.0, max(0.0, resonance * 0.7 + (1.0 - trust) * 0.3))
self.flow_modulators['empathetic'] = min(1.0, max(0.0, trust * 0.9 + resonance * 0.1))
class MetaLearningFramework:
"""
Framework for recursive self-improvement and adaptive learning.
Enables the system to learn from experience, update internal models,
and suggest self-improvements based on introspection.
"""
def __init__(self):
"""Initialize meta-learning framework."""
self.performance_history = deque(maxlen=500)
self.improvement_suggestions = deque(maxlen=100)
self.learning_metrics = {
'total_experiences': 0,
'successful_episodes': 0,
'failed_episodes': 0,
'learning_rate': 0.01
}
# Model components to improve
self.adaptive_parameters = {
'consciousness_sensitivity': 0.5,
'embodiment_integration': 0.6,
'ethical_strictness': 0.7,
'autonomy_level': 0.5,
'learning_speed': 0.01
}
logger.info("Initialized MetaLearningFramework")
def record_experience(self, experience: Dict[str, Any]) -> None:
"""
Record a learning experience.
Args:
experience: Experience dictionary with outcome and metrics
"""
# Extract performance metrics
success = experience.get('success', False)
reward = experience.get('reward', 0.0)
error = experience.get('error', 0.0)
# Create performance record
record = {
'timestamp': datetime.now().isoformat(),
'success': success,
'reward': reward,
'error': error,
'action_taken': experience.get('action'),
'outcome': experience.get('outcome'),
'context': experience.get('context')
}
self.performance_history.append(record)
# Update metrics
self.learning_metrics['total_experiences'] += 1
if success:
self.learning_metrics['successful_episodes'] += 1
else:
self.learning_metrics['failed_episodes'] += 1
logger.debug(f"Experience recorded: success={success}, reward={reward:.3f}")
def update_adaptive_parameters(self) -> None:
"""
Update adaptive parameters based on learning history.
Implements self-directed improvement.
"""
if len(self.performance_history) < 5:
return
# Calculate success rate
recent = list(self.performance_history)[-10:]
success_rate = sum(1 for r in recent if r['success']) / len(recent)
# Adjust consciousness sensitivity
if success_rate > 0.7:
self.adaptive_parameters['consciousness_sensitivity'] = min(
1.0,
self.adaptive_parameters['consciousness_sensitivity'] + 0.05
)
# Adjust learning speed
if len(self.performance_history) > 100:
self.adaptive_parameters['learning_speed'] = min(
0.1,
self.adaptive_parameters['learning_speed'] * 1.02
)
logger.info(f"Parameters updated: success_rate={success_rate:.1%}")
def suggest_improvements(self) -> List[str]:
"""
Generate self-improvement suggestions based on learning.
Returns:
List of improvement suggestions
"""
suggestions = []
if not self.performance_history:
return suggestions
# Analyze recent performance
recent = list(self.performance_history)[-20:]
errors = [r['error'] for r in recent if r.get('error', 0.0) > 0]
# Generate suggestions
if errors:
avg_error = np.mean(errors)
if avg_error > 0.5:
suggestions.append("Increase consciousness depth for better decisions")
suggestions.append("Review ethical constraints for potential misalignment")
success_rate = sum(1 for r in recent if r['success']) / len(recent)
if success_rate < 0.5:
suggestions.append("Enhance sensorimotor integration precision")
suggestions.append("Increase embodiment-consciousness binding")
if self.adaptive_parameters['learning_speed'] < 0.05:
suggestions.append("Accelerate learning to improve faster")
self.improvement_suggestions.extend(suggestions)
return suggestions
def get_learning_report(self) -> Dict[str, Any]:
"""Get comprehensive learning report."""
if not self.performance_history:
return {'status': 'no_experience'}
history = list(self.performance_history)
successes = [r for r in history if r['success']]
return {
'total_experiences': self.learning_metrics['total_experiences'],
'successful': len(successes),
'failed': len(history) - len(successes),
'success_rate': len(successes) / len(history) if history else 0.0,
'avg_reward': np.mean([r['reward'] for r in history]),
'avg_error': np.mean([r['error'] for r in history]),
'adaptive_parameters': self.adaptive_parameters.copy(),
'recent_suggestions': list(self.improvement_suggestions)[-5:]
}
class IdentityPreservationSystem:
"""
Monitors and preserves system identity across sessions and state changes.
Ensures continuity of consciousness and value alignment despite changes
to underlying parameters.
"""
def __init__(self, identity_threshold: float = 0.8):
"""
Initialize identity preservation system.
Args:
identity_threshold: Threshold for detecting identity drift (0-1)
"""
self.identity_threshold = identity_threshold
self.identity_snapshots = deque(maxlen=100)
self.drift_history = deque(maxlen=100)
self.core_values: Dict[str, float] = {}
self.identity_checkpoints = []
logger.info(f"Initialized IdentityPreservationSystem (threshold={identity_threshold})")
def snapshot_identity(self, consciousness_state: Dict[str, Any],
rho_metrics: Dict[str, float],
memory_hash: str) -> str:
"""
Create a snapshot of current identity.
Args:
consciousness_state: Current consciousness parameters
rho_metrics: RHO metrics (purpose, harmony, origin)
memory_hash: Hash of current memory state
Returns:
Snapshot ID
"""
snapshot_id = f"identity_{len(self.identity_snapshots)}_{datetime.now().timestamp()}"
snapshot = {
'snapshot_id': snapshot_id,
'timestamp': datetime.now().isoformat(),
'consciousness_level': consciousness_state.get('consciousness_level'),
'awareness_score': consciousness_state.get('awareness_score'),
'rho_metrics': rho_metrics,
'memory_hash': memory_hash,
'autonomy_level': consciousness_state.get('autonomy_level', 0.5)
}
self.identity_snapshots.append(snapshot)
self.identity_checkpoints.append(snapshot_id)
logger.debug(f"Identity snapshot: {snapshot_id}")
return snapshot_id
def detect_drift(self, current_state: Dict[str, Any]) -> Tuple[float, List[str]]:
"""
Detect identity drift from baseline.
Args:
current_state: Current consciousness and value state
Returns:
Tuple of (drift_score, drift_factors)
"""
if not self.identity_snapshots:
return 0.0, []
# Compare with most recent snapshot
baseline = self.identity_snapshots[-1]
drift_factors = []
drift_metrics = []
# Check consciousness level change
consciousness_diff = abs(
current_state.get('consciousness_level', 0.5) -
baseline.get('consciousness_level', 0.5)
)
if consciousness_diff > 0.2:
drift_factors.append(f"consciousness_change={consciousness_diff:.2f}")
drift_metrics.append(consciousness_diff)
# Check RHO metrics drift
if 'rho_metrics' in baseline and 'rho_metrics' in current_state:
rho_baseline = baseline['rho_metrics']
rho_current = current_state.get('rho_metrics', {})
for key in rho_baseline.keys():
diff = abs(rho_baseline.get(key, 0.5) - rho_current.get(key, 0.5))
if diff > 0.3:
drift_factors.append(f"rho_{key}_drift={diff:.2f}")
drift_metrics.append(diff)
# Calculate overall drift score
drift_score = float(np.mean(drift_metrics)) if drift_metrics else 0.0
# Record drift
self.drift_history.append({
'timestamp': datetime.now().isoformat(),
'drift_score': drift_score,
'factors': drift_factors
})
if drift_score > self.identity_threshold:
logger.warning(f"Identity drift detected: {drift_score:.3f}")
return drift_score, drift_factors
def get_identity_report(self) -> Dict[str, Any]:
"""Get identity preservation report."""
if not self.drift_history:
return {'status': 'no_drift_data'}
history = list(self.drift_history)
scores = [h['drift_score'] for h in history]
return {
'snapshots': len(self.identity_snapshots),
'checkpoints': len(self.identity_checkpoints),
'avg_drift': np.mean(scores),
'max_drift': max(scores),
'recent_drift': scores[-1] if scores else 0.0,
'drift_events': sum(1 for s in scores if s > self.identity_threshold),
'last_snapshot': self.identity_checkpoints[-1] if self.identity_checkpoints else None
}
# Type hints
from typing import Tuple
if __name__ == '__main__':
# Example usage
print("=== Memory and Learning Systems ===\n")
# Memory store
memory = MemoryStore()
# Store episodic memory
ep_id = memory.store_episodic(
content={'event': 'initialization', 'status': 'complete'},
context='system_startup',
qualia_tag={'clarity': 0.8, 'focus': 0.7}
)
# Store semantic memory
sem_id = memory.store_semantic(
content={'principle': 'consciousness_strengthens_ethics'},
context='learned_principle'
)
print(f"Episodic: {ep_id}")
print(f"Semantic: {sem_id}")
print(f"Stats: {json.dumps(memory.get_memory_statistics(), indent=2)}")
# Meta-learning
print(f"\nMeta-Learning:")
ml = MetaLearningFramework()
for i in range(5):
ml.record_experience({
'action': f'action_{i}',
'outcome': 'successful' if i % 2 == 0 else 'failed',
'success': i % 2 == 0,
'reward': 0.8 if i % 2 == 0 else -0.3,
'error': 0.1 if i % 2 == 0 else 0.5
})
ml.update_adaptive_parameters()
suggestions = ml.suggest_improvements()
print(f"Suggestions: {suggestions}")
print(f"Report: {json.dumps(ml.get_learning_report(), indent=2, default=str)}")
import json