| | """ |
| | MemoryShell - Temporal Memory Architecture for Recursive Agents |
| | |
| | This module implements the memory shell architecture that enables agents to |
| | maintain persistent memory with configurable decay properties. The memory |
| | shell acts as a cognitive substrate that provides: |
| | |
| | - Short-term working memory |
| | - Medium-term episodic memory with decay |
| | - Long-term semantic memory with compression |
| | - Temporal relationship tracking |
| | - Experience-based learning |
| | |
| | Internal Note: The memory shell simulates the MEMTRACE and ECHO-LOOP interpretability |
| | shells for modeling memory decay and feedback loops in agent cognition. |
| | """ |
| |
|
| | import datetime |
| | import math |
| | import uuid |
| | import heapq |
| | from typing import Dict, List, Any, Optional, Tuple, Set |
| | import numpy as np |
| | from collections import defaultdict, deque |
| |
|
| | from pydantic import BaseModel, Field |
| |
|
| |
|
| | class Memory(BaseModel): |
| | """Base memory unit with attribution and decay properties.""" |
| | |
| | id: str = Field(default_factory=lambda: str(uuid.uuid4())) |
| | content: Dict[str, Any] = Field(...) |
| | memory_type: str = Field(...) |
| | creation_time: datetime.datetime = Field(default_factory=datetime.datetime.now) |
| | last_access_time: datetime.datetime = Field(default_factory=datetime.datetime.now) |
| | access_count: int = Field(default=1) |
| | salience: float = Field(default=1.0) |
| | decay_rate: float = Field(default=0.1) |
| | associations: Dict[str, float] = Field(default_factory=dict) |
| | source: Optional[str] = Field(default=None) |
| | tags: List[str] = Field(default_factory=list) |
| | |
| | def update_access(self) -> None: |
| | """Update access time and count.""" |
| | self.last_access_time = datetime.datetime.now() |
| | self.access_count += 1 |
| | |
| | def calculate_current_salience(self) -> float: |
| | """Calculate current salience based on decay model.""" |
| | |
| | hours_since_creation = (datetime.datetime.now() - self.creation_time).total_seconds() / 3600 |
| | |
| | |
| | base_decay = math.exp(-self.decay_rate * hours_since_creation) |
| | access_factor = math.log1p(self.access_count) / 10 |
| | |
| | |
| | current_salience = min(1.0, self.salience * base_decay * (1 + access_factor)) |
| | |
| | return current_salience |
| | |
| | def add_association(self, memory_id: str, strength: float = 0.5) -> None: |
| | """ |
| | Add association to another memory. |
| | |
| | Args: |
| | memory_id: ID of memory to associate with |
| | strength: Association strength (0-1) |
| | """ |
| | self.associations[memory_id] = strength |
| | |
| | def add_tag(self, tag: str) -> None: |
| | """ |
| | Add semantic tag to memory. |
| | |
| | Args: |
| | tag: Tag to add |
| | """ |
| | if tag not in self.tags: |
| | self.tags.append(tag) |
| | |
| | def as_dict(self) -> Dict[str, Any]: |
| | """Convert to dictionary for export.""" |
| | return { |
| | "id": self.id, |
| | "content": self.content, |
| | "memory_type": self.memory_type, |
| | "creation_time": self.creation_time.isoformat(), |
| | "last_access_time": self.last_access_time.isoformat(), |
| | "access_count": self.access_count, |
| | "salience": self.salience, |
| | "current_salience": self.calculate_current_salience(), |
| | "decay_rate": self.decay_rate, |
| | "associations": self.associations, |
| | "source": self.source, |
| | "tags": self.tags, |
| | } |
| |
|
| |
|
| | class EpisodicMemory(Memory): |
| | """Episodic memory representing specific experiences.""" |
| | |
| | sequence_position: Optional[int] = Field(default=None) |
| | emotional_valence: float = Field(default=0.0) |
| | outcome: Optional[str] = Field(default=None) |
| | |
| | def __init__(self, **data): |
| | data["memory_type"] = "episodic" |
| | super().__init__(**data) |
| |
|
| |
|
| | class SemanticMemory(Memory): |
| | """Semantic memory representing conceptual knowledge.""" |
| | |
| | certainty: float = Field(default=0.7) |
| | contradiction_ids: List[str] = Field(default_factory=list) |
| | supporting_evidence: List[str] = Field(default_factory=list) |
| | |
| | def __init__(self, **data): |
| | data["memory_type"] = "semantic" |
| | |
| | data.setdefault("decay_rate", 0.05) |
| | super().__init__(**data) |
| | |
| | def add_evidence(self, memory_id: str, is_supporting: bool = True) -> None: |
| | """ |
| | Add supporting or contradicting evidence. |
| | |
| | Args: |
| | memory_id: Memory ID for evidence |
| | is_supporting: Whether evidence is supporting (True) or contradicting (False) |
| | """ |
| | if is_supporting: |
| | if memory_id not in self.supporting_evidence: |
| | self.supporting_evidence.append(memory_id) |
| | else: |
| | if memory_id not in self.contradiction_ids: |
| | self.contradiction_ids.append(memory_id) |
| | |
| | def update_certainty(self, evidence_ratio: float) -> None: |
| | """ |
| | Update certainty based on supporting/contradicting evidence ratio. |
| | |
| | Args: |
| | evidence_ratio: Ratio of supporting to total evidence (0-1) |
| | """ |
| | |
| | self.certainty = 0.7 * self.certainty + 0.3 * evidence_ratio |
| |
|
| |
|
| | class WorkingMemory(Memory): |
| | """Working memory representing active thinking and temporary storage.""" |
| | |
| | expiration_time: datetime.datetime = Field(default_factory=lambda: datetime.datetime.now() + datetime.timedelta(hours=1)) |
| | priority: int = Field(default=1) |
| | |
| | def __init__(self, **data): |
| | data["memory_type"] = "working" |
| | |
| | data.setdefault("decay_rate", 0.5) |
| | super().__init__(**data) |
| | |
| | def set_expiration(self, hours: float) -> None: |
| | """ |
| | Set expiration time for working memory. |
| | |
| | Args: |
| | hours: Hours until expiration |
| | """ |
| | self.expiration_time = datetime.datetime.now() + datetime.timedelta(hours=hours) |
| | |
| | def is_expired(self) -> bool: |
| | """Check if working memory has expired.""" |
| | return datetime.datetime.now() > self.expiration_time |
| |
|
| |
|
| | class MemoryShell: |
| | """ |
| | Memory shell architecture for agent cognitive persistence. |
| | |
| | The MemoryShell provides: |
| | - Multi-tiered memory system (working, episodic, semantic) |
| | - Configurable decay rates for different memory types |
| | - Time-based and access-based memory reinforcement |
| | - Associative memory network with activation spread |
| | - Query capabilities with relevance ranking |
| | """ |
| | |
| | def __init__(self, decay_rate: float = 0.2): |
| | """ |
| | Initialize memory shell. |
| | |
| | Args: |
| | decay_rate: Base decay rate for memories |
| | """ |
| | self.memories: Dict[str, Memory] = {} |
| | self.decay_rate = decay_rate |
| | self.working_memory_capacity = 7 |
| | self.episodic_index: Dict[str, Set[str]] = defaultdict(set) |
| | self.semantic_index: Dict[str, Set[str]] = defaultdict(set) |
| | self.temporal_sequence: List[str] = [] |
| | self.activation_threshold = 0.1 |
| | |
| | |
| | self.stats = { |
| | "total_memories_created": 0, |
| | "total_memories_decayed": 0, |
| | "working_memory_count": 0, |
| | "episodic_memory_count": 0, |
| | "semantic_memory_count": 0, |
| | "average_salience": 0.0, |
| | "association_count": 0, |
| | } |
| | |
| | def add_working_memory(self, content: Dict[str, Any], priority: int = 1, |
| | expiration_hours: float = 1.0, tags: List[str] = None) -> str: |
| | """ |
| | Add item to working memory. |
| | |
| | Args: |
| | content: Memory content |
| | priority: Priority level (higher = more important) |
| | expiration_hours: Hours until expiration |
| | tags: Semantic tags |
| | |
| | Returns: |
| | Memory ID |
| | """ |
| | |
| | memory = WorkingMemory( |
| | content=content, |
| | priority=priority, |
| | decay_rate=self.decay_rate * 2, |
| | tags=tags or [], |
| | source="working", |
| | ) |
| | |
| | |
| | memory.set_expiration(expiration_hours) |
| | |
| | |
| | self.memories[memory.id] = memory |
| | |
| | |
| | for tag in memory.tags: |
| | self.episodic_index[tag].add(memory.id) |
| | |
| | |
| | self._enforce_working_memory_capacity() |
| | |
| | |
| | self.stats["total_memories_created"] += 1 |
| | self.stats["working_memory_count"] += 1 |
| | |
| | return memory.id |
| | |
| | def add_episodic_memory(self, content: Dict[str, Any], emotional_valence: float = 0.0, |
| | outcome: Optional[str] = None, tags: List[str] = None) -> str: |
| | """ |
| | Add episodic memory. |
| | |
| | Args: |
| | content: Memory content |
| | emotional_valence: Emotional charge (-1 to 1) |
| | outcome: Outcome of the experience |
| | tags: Semantic tags |
| | |
| | Returns: |
| | Memory ID |
| | """ |
| | |
| | memory = EpisodicMemory( |
| | content=content, |
| | emotional_valence=emotional_valence, |
| | outcome=outcome, |
| | decay_rate=self.decay_rate, |
| | tags=tags or [], |
| | source="episode", |
| | ) |
| | |
| | |
| | memory.sequence_position = len(self.temporal_sequence) |
| | |
| | |
| | self.memories[memory.id] = memory |
| | |
| | |
| | for tag in memory.tags: |
| | self.episodic_index[tag].add(memory.id) |
| | |
| | |
| | self.temporal_sequence.append(memory.id) |
| | |
| | |
| | self.stats["total_memories_created"] += 1 |
| | self.stats["episodic_memory_count"] += 1 |
| | |
| | return memory.id |
| | |
| | def add_semantic_memory(self, content: Dict[str, Any], certainty: float = 0.7, |
| | tags: List[str] = None) -> str: |
| | """ |
| | Add semantic memory. |
| | |
| | Args: |
| | content: Memory content |
| | certainty: Certainty level (0-1) |
| | tags: Semantic tags |
| | |
| | Returns: |
| | Memory ID |
| | """ |
| | |
| | memory = SemanticMemory( |
| | content=content, |
| | certainty=certainty, |
| | decay_rate=self.decay_rate * 0.5, |
| | tags=tags or [], |
| | source="semantic", |
| | ) |
| | |
| | |
| | self.memories[memory.id] = memory |
| | |
| | |
| | for tag in memory.tags: |
| | self.semantic_index[tag].add(memory.id) |
| | |
| | |
| | self.stats["total_memories_created"] += 1 |
| | self.stats["semantic_memory_count"] += 1 |
| | |
| | return memory.id |
| | |
| | def add_experience(self, experience: Dict[str, Any]) -> Tuple[str, List[str]]: |
| | """ |
| | Add new experience as episodic memory and extract semantic memories. |
| | |
| | Args: |
| | experience: Experience data |
| | |
| | Returns: |
| | Tuple of (episodic_id, list of semantic_ids) |
| | """ |
| | |
| | tags = experience.get("tags", []) |
| | if not tags and "type" in experience: |
| | tags = [experience["type"]] |
| | |
| | |
| | episodic_id = self.add_episodic_memory( |
| | content=experience, |
| | emotional_valence=experience.get("emotional_valence", 0.0), |
| | outcome=experience.get("outcome"), |
| | tags=tags, |
| | ) |
| | |
| | |
| | semantic_ids = [] |
| | if "insights" in experience and isinstance(experience["insights"], list): |
| | for insight in experience["insights"]: |
| | if isinstance(insight, dict): |
| | semantic_id = self.add_semantic_memory( |
| | content=insight, |
| | certainty=insight.get("confidence", 0.7), |
| | tags=insight.get("tags", tags), |
| | ) |
| | semantic_ids.append(semantic_id) |
| | |
| | |
| | self.add_association(episodic_id, semantic_id, 0.8) |
| | |
| | return episodic_id, semantic_ids |
| | |
| | def add_association(self, memory_id1: str, memory_id2: str, strength: float = 0.5) -> bool: |
| | """ |
| | Add bidirectional association between memories. |
| | |
| | Args: |
| | memory_id1: First memory ID |
| | memory_id2: Second memory ID |
| | strength: Association strength (0-1) |
| | |
| | Returns: |
| | Success status |
| | """ |
| | |
| | if memory_id1 not in self.memories or memory_id2 not in self.memories: |
| | return False |
| | |
| | |
| | self.memories[memory_id1].add_association(memory_id2, strength) |
| | self.memories[memory_id2].add_association(memory_id1, strength) |
| | |
| | |
| | self.stats["association_count"] += 2 |
| | |
| | return True |
| | |
| | def get_memory(self, memory_id: str) -> Optional[Dict[str, Any]]: |
| | """ |
| | Retrieve memory by ID. |
| | |
| | Args: |
| | memory_id: Memory ID |
| | |
| | Returns: |
| | Memory data or None if not found |
| | """ |
| | if memory_id not in self.memories: |
| | return None |
| | |
| | |
| | memory = self.memories[memory_id] |
| | |
| | |
| | memory.update_access() |
| | |
| | |
| | memory_dict = memory.as_dict() |
| | |
| | return memory_dict |
| | |
| | def query_memories(self, query: Dict[str, Any], memory_type: Optional[str] = None, |
| | tags: Optional[List[str]] = None, limit: int = 10) -> List[Dict[str, Any]]: |
| | """ |
| | Query memories based on content, type, and tags. |
| | |
| | Args: |
| | query: Query terms |
| | memory_type: Optional filter by memory type |
| | tags: Optional filter by tags |
| | limit: Maximum number of results |
| | |
| | Returns: |
| | List of matching memories |
| | """ |
| | |
| | candidate_ids = set() |
| | |
| | if memory_type: |
| | |
| | for memory_id, memory in self.memories.items(): |
| | if memory.memory_type == memory_type: |
| | candidate_ids.add(memory_id) |
| | else: |
| | |
| | candidate_ids = set(self.memories.keys()) |
| | |
| | |
| | if tags: |
| | tag_memories = set() |
| | for tag in tags: |
| | |
| | tag_memories.update(self.episodic_index.get(tag, set())) |
| | tag_memories.update(self.semantic_index.get(tag, set())) |
| | |
| | |
| | if tag_memories: |
| | candidate_ids = candidate_ids.intersection(tag_memories) |
| | |
| | |
| | scored_candidates = [] |
| | |
| | for memory_id in candidate_ids: |
| | memory = self.memories[memory_id] |
| | |
| | |
| | current_salience = memory.calculate_current_salience() |
| | if current_salience < self.activation_threshold: |
| | continue |
| | |
| | |
| | relevance = self._calculate_relevance(memory, query) |
| | |
| | |
| | score = 0.7 * relevance + 0.3 * current_salience |
| | |
| | |
| | scored_candidates.append((memory_id, score)) |
| | |
| | |
| | top_candidates = heapq.nlargest(limit, scored_candidates, key=lambda x: x[1]) |
| | |
| | |
| | result_memories = [] |
| | for memory_id, score in top_candidates: |
| | memory = self.memories[memory_id] |
| | memory.update_access() |
| | |
| | |
| | memory_dict = memory.as_dict() |
| | memory_dict["relevance_score"] = score |
| | |
| | result_memories.append(memory_dict) |
| | |
| | return result_memories |
| | |
| | def get_recent_memories(self, memory_type: Optional[str] = None, limit: int = 5) -> List[Dict[str, Any]]: |
| | """ |
| | Get most recent memories by creation time. |
| | |
| | Args: |
| | memory_type: Optional filter by memory type |
| | limit: Maximum number of results |
| | |
| | Returns: |
| | List of recent memories |
| | """ |
| | |
| | recent_memories = [] |
| | |
| | for memory_id, memory in self.memories.items(): |
| | |
| | if memory_type and memory.memory_type != memory_type: |
| | continue |
| | |
| | |
| | recent_memories.append((memory_id, memory.creation_time)) |
| | |
| | |
| | recent_memories.sort(key=lambda x: x[1], reverse=True) |
| | |
| | |
| | result_memories = [] |
| | for memory_id, _ in recent_memories[:limit]: |
| | memory = self.memories[memory_id] |
| | memory.update_access() |
| | result_memories.append(memory.as_dict()) |
| | |
| | return result_memories |
| | |
| | def get_temporal_sequence(self, start_index: int = 0, limit: int = 10) -> List[Dict[str, Any]]: |
| | """ |
| | Get temporal sequence of episodic memories. |
| | |
| | Args: |
| | start_index: Starting index in sequence |
| | limit: Maximum number of results |
| | |
| | Returns: |
| | List of episodic memories in temporal order |
| | """ |
| | |
| | sequence_slice = self.temporal_sequence[start_index:start_index+limit] |
| | |
| | |
| | result_memories = [] |
| | for memory_id in sequence_slice: |
| | if memory_id in self.memories: |
| | memory = self.memories[memory_id] |
| | memory.update_access() |
| | result_memories.append(memory.as_dict()) |
| | |
| | return result_memories |
| | |
| | def get_relevant_experiences(self, query: Optional[Dict[str, Any]] = None, |
| | tags: Optional[List[str]] = None, limit: int = 5) -> List[Dict[str, Any]]: |
| | """ |
| | Get relevant episodic experiences. |
| | |
| | Args: |
| | query: Optional query terms |
| | tags: Optional filter by tags |
| | limit: Maximum number of results |
| | |
| | Returns: |
| | List of relevant experiences |
| | """ |
| | |
| | if query: |
| | return self.query_memories(query, memory_type="episodic", tags=tags, limit=limit) |
| | |
| | |
| | |
| | candidate_ids = set() |
| | if tags: |
| | for tag in tags: |
| | candidate_ids.update(self.episodic_index.get(tag, set())) |
| | else: |
| | |
| | candidate_ids = {memory_id for memory_id, memory in self.memories.items() |
| | if memory.memory_type == "episodic"} |
| | |
| | |
| | scored_candidates = [] |
| | for memory_id in candidate_ids: |
| | if memory_id in self.memories: |
| | memory = self.memories[memory_id] |
| | current_salience = memory.calculate_current_salience() |
| | |
| | |
| | if current_salience < self.activation_threshold: |
| | continue |
| | |
| | scored_candidates.append((memory_id, current_salience)) |
| | |
| | |
| | top_candidates = heapq.nlargest(limit, scored_candidates, key=lambda x: x[1]) |
| | |
| | |
| | result_memories = [] |
| | for memory_id, _ in top_candidates: |
| | memory = self.memories[memory_id] |
| | memory.update_access() |
| | result_memories.append(memory.as_dict()) |
| | |
| | return result_memories |
| | |
| | def get_beliefs(self, tags: Optional[List[str]] = None, certainty_threshold: float = 0.5) -> List[Dict[str, Any]]: |
| | """ |
| | Get semantic beliefs with high certainty. |
| | |
| | Args: |
| | tags: Optional filter by tags |
| | certainty_threshold: Minimum certainty threshold |
| | |
| | Returns: |
| | List of semantic beliefs |
| | """ |
| | |
| | candidate_ids = set() |
| | if tags: |
| | for tag in tags: |
| | candidate_ids.update(self.semantic_index.get(tag, set())) |
| | else: |
| | |
| | candidate_ids = {memory_id for memory_id, memory in self.memories.items() |
| | if memory.memory_type == "semantic"} |
| | |
| | |
| | scored_candidates = [] |
| | for memory_id in candidate_ids: |
| | if memory_id in self.memories: |
| | memory = self.memories[memory_id] |
| | |
| | |
| | if memory.memory_type != "semantic" or not hasattr(memory, "certainty"): |
| | continue |
| | |
| | if memory.certainty < certainty_threshold: |
| | continue |
| | |
| | |
| | current_salience = memory.calculate_current_salience() |
| | |
| | |
| | if current_salience < self.activation_threshold: |
| | continue |
| | |
| | |
| | score = 0.6 * memory.certainty + 0.4 * current_salience |
| | |
| | scored_candidates.append((memory_id, score)) |
| | |
| | |
| | scored_candidates.sort(key=lambda x: x[1], reverse=True) |
| | |
| | |
| | result_memories = [] |
| | for memory_id, score in scored_candidates: |
| | memory = self.memories[memory_id] |
| | memory.update_access() |
| | |
| | |
| | memory_dict = memory.as_dict() |
| | memory_dict["belief_score"] = score |
| | |
| | result_memories.append(memory_dict) |
| | |
| | return result_memories |
| | |
| | def apply_decay(self) -> int: |
| | """ |
| | Apply memory decay to all memories and clean up decayed memories. |
| | |
| | Returns: |
| | Number of memories removed due to decay |
| | """ |
| | |
| | to_remove = [] |
| | |
| | |
| | for memory_id, memory in self.memories.items(): |
| | |
| | current_salience = memory.calculate_current_salience() |
| | |
| | |
| | if current_salience < self.activation_threshold: |
| | to_remove.append(memory_id) |
| | |
| | |
| | for memory_id in to_remove: |
| | self._remove_memory(memory_id) |
| | |
| | |
| | self.stats["total_memories_decayed"] += len(to_remove) |
| | self.stats["working_memory_count"] = sum(1 for m in self.memories.values() if m.memory_type == "working") |
| | self.stats["episodic_memory_count"] = sum(1 for m in self.memories.values() if m.memory_type == "episodic") |
| | self.stats["semantic_memory_count"] = sum(1 for m in self.memories.values() if m.memory_type == "semantic") |
| | |
| | |
| | if self.memories: |
| | self.stats["average_salience"] = sum(m.calculate_current_salience() for m in self.memories.values()) / len(self.memories) |
| | |
| | return len(to_remove) |
| | |
| | def consolidate_memories(self) -> Dict[str, Any]: |
| | """ |
| | Consolidate episodic memories into semantic memories. |
| | |
| | Returns: |
| | Consolidation results |
| | """ |
| | |
| | |
| | recent_episodic = self.get_recent_memories(memory_type="episodic", limit=10) |
| | |
| | |
| | tag_counts = defaultdict(int) |
| | for memory in recent_episodic: |
| | for tag in memory.get("tags", []): |
| | tag_counts[tag] += 1 |
| | |
| | |
| | common_tags = {tag for tag, count in tag_counts.items() if count >= 3} |
| | |
| | |
| | consolidated_ids = [] |
| | if common_tags: |
| | |
| | semantic_id = self.add_semantic_memory( |
| | content={ |
| | "consolidated_from": [m.get("id") for m in recent_episodic], |
| | "common_tags": list(common_tags), |
| | "summary": f"Consolidated memory with common tags: {', '.join(common_tags)}" |
| | }, |
| | certainty=0.6, |
| | tags=list(common_tags), |
| | ) |
| | |
| | consolidated_ids.append(semantic_id) |
| | |
| | return { |
| | "consolidated_count": len(consolidated_ids), |
| | "consolidated_ids": consolidated_ids, |
| | "common_tags": list(common_tags) if common_tags else [] |
| | } |
| | |
| | def _calculate_relevance(self, memory: Memory, query: Dict[str, Any]) -> float: |
| | """ |
| | Calculate relevance score of memory to query. |
| | |
| | Args: |
| | memory: Memory to score |
| | query: Query terms |
| | |
| | Returns: |
| | Relevance score (0-1) |
| | """ |
| | |
| | relevance = 0.0 |
| | |
| | |
| | content = memory.content |
| | |
| | |
| | matching_keys = set(query.keys()).intersection(set(content.keys())) |
| | if matching_keys: |
| | relevance += 0.3 * (len(matching_keys) / len(query)) |
| | |
| | |
| | for key, value in query.items(): |
| | if key in content and isinstance(value, str) and isinstance(content[key], str): |
| | if value.lower() in content[key].lower(): |
| | relevance += 0.2 |
| | elif key in content and value == content[key]: |
| | relevance += 0.3 |
| | |
| | |
| | query_tags = query.get("tags", []) |
| | if isinstance(query_tags, list) and memory.tags: |
| | matching_tags = set(query_tags).intersection(set(memory.tags)) |
| | if matching_tags: |
| | relevance += 0.3 * (len(matching_tags) / len(query_tags)) |
| | |
| | |
| | return min(1.0, relevance) |
| | |
| | def _enforce_working_memory_capacity(self) -> None: |
| | """Enforce working memory capacity limit by removing low priority items.""" |
| | |
| | working_memories = [(memory_id, memory) for memory_id, memory in self.memories.items() |
| | if memory.memory_type == "working"] |
| | |
| | |
| | if len(working_memories) <= self.working_memory_capacity: |
| | return |
| | |
| | |
| | working_memories.sort(key=lambda x: (x[1].priority, x[1].calculate_current_salience())) |
| | |
| | |
| | for memory_id, _ in working_memories[:len(working_memories) - self.working_memory_capacity]: |
| | self._remove_memory(memory_id) |
| | |
| | def _remove_memory(self, memory_id: str) -> None: |
| | """ |
| | Remove memory by ID. |
| | |
| | Args: |
| | memory_id: Memory ID to remove |
| | """ |
| | if memory_id not in self.memories: |
| | return |
| | |
| | |
| | memory = self.memories[memory_id] |
| | |
| | |
| | del self.memories[memory_id] |
| | |
| | |
| | for tag in memory.tags: |
| | if memory.memory_type == "episodic" and tag in self.episodic_index: |
| | self.episodic_index[tag].discard(memory_id) |
| | elif memory.memory_type == "semantic" and tag in self.semantic_index: |
| | self.semantic_index[tag].discard(memory_id) |
| | |
| | |
| | if memory.memory_type == "episodic": |
| | if memory_id in self.temporal_sequence: |
| | self.temporal_sequence.remove(memory_id) |
| | |
| | |
| | for other_id, other_memory in self.memories.items(): |
| | if memory_id in other_memory.associations: |
| | del other_memory.associations[memory_id] |
| | |
| | def export_state(self) -> Dict[str, Any]: |
| | """ |
| | Export memory shell state. |
| | |
| | Returns: |
| | Serializable memory shell state |
| | """ |
| | |
| | memory_dicts = {memory_id: memory.as_dict() for memory_id, memory in self.memories.items()} |
| | |
| | |
| | episodic_index = {tag: list(memories) for tag, memories in self.episodic_index.items()} |
| | semantic_index = {tag: list(memories) for tag, memories in self.semantic_index.items()} |
| | |
| | |
| | state = { |
| | "memories": memory_dicts, |
| | "episodic_index": episodic_index, |
| | "semantic_index": semantic_index, |
| | "temporal_sequence": self.temporal_sequence, |
| | "decay_rate": self.decay_rate, |
| | "activation_threshold": self.activation_threshold, |
| | "working_memory_capacity": self.working_memory_capacity, |
| | "stats": self.stats, |
| | } |
| | |
| | return state |
| | |
| | def import_state(self, state: Dict[str, Any]) -> None: |
| | """ |
| | Import memory shell state. |
| | |
| | Args: |
| | state: Memory shell state |
| | """ |
| | |
| | self.memories = {} |
| | self.episodic_index = defaultdict(set) |
| | self.semantic_index = defaultdict(set) |
| | self.temporal_sequence = [] |
| | |
| | |
| | self.decay_rate = state.get("decay_rate", self.decay_rate) |
| | self.activation_threshold = state.get("activation_threshold", self.activation_threshold) |
| | self.working_memory_capacity = state.get("working_memory_capacity", self.working_memory_capacity) |
| | |
| | |
| | for memory_id, memory_dict in state.get("memories", {}).items(): |
| | memory_type = memory_dict.get("memory_type") |
| | |
| | if memory_type == "working": |
| | |
| | memory = WorkingMemory( |
| | id=memory_id, |
| | content=memory_dict.get("content", {}), |
| | priority=memory_dict.get("priority", 1), |
| | decay_rate=memory_dict.get("decay_rate", self.decay_rate * 2), |
| | tags=memory_dict.get("tags", []), |
| | source=memory_dict.get("source", "working"), |
| | salience=memory_dict.get("salience", 1.0), |
| | creation_time=datetime.datetime.fromisoformat(memory_dict.get("creation_time", datetime.datetime.now().isoformat())), |
| | last_access_time=datetime.datetime.fromisoformat(memory_dict.get("last_access_time", datetime.datetime.now().isoformat())), |
| | access_count=memory_dict.get("access_count", 1), |
| | associations=memory_dict.get("associations", {}), |
| | ) |
| | |
| | |
| | if "expiration_time" in memory_dict: |
| | memory.expiration_time = datetime.datetime.fromisoformat(memory_dict["expiration_time"]) |
| | else: |
| | memory.set_expiration(1.0) |
| | |
| | |
| | self.memories[memory_id] = memory |
| | |
| | elif memory_type == "episodic": |
| | |
| | memory = EpisodicMemory( |
| | id=memory_id, |
| | content=memory_dict.get("content", {}), |
| | emotional_valence=memory_dict.get("emotional_valence", 0.0), |
| | outcome=memory_dict.get("outcome"), |
| | decay_rate=memory_dict.get("decay_rate", self.decay_rate), |
| | tags=memory_dict.get("tags", []), |
| | source=memory_dict.get("source", "episode"), |
| | salience=memory_dict.get("salience", 1.0), |
| | creation_time=datetime.datetime.fromisoformat(memory_dict.get("creation_time", datetime.datetime.now().isoformat())), |
| | last_access_time=datetime.datetime.fromisoformat(memory_dict.get("last_access_time", datetime.datetime.now().isoformat())), |
| | access_count=memory_dict.get("access_count", 1), |
| | associations=memory_dict.get("associations", {}), |
| | sequence_position=memory_dict.get("sequence_position"), |
| | ) |
| | |
| | |
| | self.memories[memory_id] = memory |
| | |
| | elif memory_type == "semantic": |
| | |
| | memory = SemanticMemory( |
| | id=memory_id, |
| | content=memory_dict.get("content", {}), |
| | certainty=memory_dict.get("certainty", 0.7), |
| | decay_rate=memory_dict.get("decay_rate", self.decay_rate * 0.5), |
| | tags=memory_dict.get("tags", []), |
| | source=memory_dict.get("source", "semantic"), |
| | salience=memory_dict.get("salience", 1.0), |
| | creation_time=datetime.datetime.fromisoformat(memory_dict.get("creation_time", datetime.datetime.now().isoformat())), |
| | last_access_time=datetime.datetime.fromisoformat(memory_dict.get("last_access_time", datetime.datetime.now().isoformat())), |
| | access_count=memory_dict.get("access_count", 1), |
| | associations=memory_dict.get("associations", {}), |
| | contradiction_ids=memory_dict.get("contradiction_ids", []), |
| | supporting_evidence=memory_dict.get("supporting_evidence", []), |
| | ) |
| | |
| | |
| | self.memories[memory_id] = memory |
| | |
| | |
| | for tag, memory_ids in state.get("episodic_index", {}).items(): |
| | self.episodic_index[tag] = set(memory_ids) |
| | |
| | for tag, memory_ids in state.get("semantic_index", {}).items(): |
| | self.semantic_index[tag] = set(memory_ids) |
| | |
| | |
| | self.temporal_sequence = state.get("temporal_sequence", []) |
| | |
| | |
| | self.stats = state.get("stats", self.stats.copy()) |
| | |
| | |
| | self.stats["working_memory_count"] = sum(1 for m in self.memories.values() if m.memory_type == "working") |
| | self.stats["episodic_memory_count"] = sum(1 for m in self.memories.values() if m.memory_type == "episodic") |
| | self.stats["semantic_memory_count"] = sum(1 for m in self.memories.values() if m.memory_type == "semantic") |
| | |
| | def get_stats(self) -> Dict[str, Any]: |
| | """ |
| | Get memory shell statistics. |
| | |
| | Returns: |
| | Memory statistics |
| | """ |
| | |
| | self.stats["working_memory_count"] = sum(1 for m in self.memories.values() if m.memory_type == "working") |
| | self.stats["episodic_memory_count"] = sum(1 for m in self.memories.values() if m.memory_type == "episodic") |
| | self.stats["semantic_memory_count"] = sum(1 for m in self.memories.values() if m.memory_type == "semantic") |
| | |
| | |
| | if self.memories: |
| | self.stats["average_salience"] = sum(m.calculate_current_salience() for m in self.memories.values()) / len(self.memories) |
| | |
| | |
| | active_memories = sum(1 for m in self.memories.values() |
| | if m.calculate_current_salience() >= self.activation_threshold) |
| | |
| | tag_stats = { |
| | "episodic_tags": len(self.episodic_index), |
| | "semantic_tags": len(self.semantic_index), |
| | } |
| | |
| | decay_stats = { |
| | "activation_threshold": self.activation_threshold, |
| | "active_memory_ratio": active_memories / len(self.memories) if self.memories else 0, |
| | "decay_rate": self.decay_rate, |
| | } |
| | |
| | return { |
| | **self.stats, |
| | **tag_stats, |
| | **decay_stats, |
| | "total_memories": len(self.memories), |
| | "active_memories": active_memories, |
| | } |
| |
|