SCoDA / coda /core /memory.py
vanishingradient's picture
Added init files
9281fab
"""
Shared memory buffer for inter-agent communication in CoDA.
Provides thread-safe storage for agents to exchange context,
results, and feedback during the visualization pipeline.
"""
import threading
from datetime import datetime
from typing import Any, Optional
from pydantic import BaseModel, Field
class MemoryEntry(BaseModel):
"""A single entry in the shared memory."""
key: str
value: Any
agent_name: str
timestamp: datetime = Field(default_factory=datetime.now)
metadata: dict[str, Any] = Field(default_factory=dict)
class SharedMemory:
"""
Thread-safe shared memory buffer for agent communication.
Agents can store and retrieve structured data using string keys.
Each entry tracks the source agent and timestamp for debugging.
"""
def __init__(self) -> None:
self._storage: dict[str, MemoryEntry] = {}
self._lock = threading.RLock()
self._history: list[MemoryEntry] = []
def store(
self,
key: str,
value: Any,
agent_name: str,
metadata: Optional[dict[str, Any]] = None,
) -> None:
"""
Store a value in shared memory.
Args:
key: Unique identifier for the data
value: The data to store (should be JSON-serializable)
agent_name: Name of the agent storing the data
metadata: Optional additional context
"""
entry = MemoryEntry(
key=key,
value=value,
agent_name=agent_name,
metadata=metadata or {},
)
with self._lock:
self._storage[key] = entry
self._history.append(entry)
def retrieve(self, key: str) -> Optional[Any]:
"""
Retrieve a value from shared memory.
Args:
key: The key to look up
Returns:
The stored value, or None if not found
"""
with self._lock:
entry = self._storage.get(key)
return entry.value if entry else None
def retrieve_entry(self, key: str) -> Optional[MemoryEntry]:
"""
Retrieve the full memory entry including metadata.
Args:
key: The key to look up
Returns:
The full MemoryEntry, or None if not found
"""
with self._lock:
return self._storage.get(key)
def get_context(self, keys: list[str]) -> dict[str, Any]:
"""
Retrieve multiple values as a context dictionary.
Args:
keys: List of keys to retrieve
Returns:
Dictionary mapping keys to their values (missing keys excluded)
"""
with self._lock:
return {
key: self._storage[key].value
for key in keys
if key in self._storage
}
def get_all(self) -> dict[str, Any]:
"""
Retrieve all stored values.
Returns:
Dictionary mapping all keys to their values
"""
with self._lock:
return {key: entry.value for key, entry in self._storage.items()}
def get_history(self, agent_name: Optional[str] = None) -> list[MemoryEntry]:
"""
Get the history of all memory operations.
Args:
agent_name: Optional filter by agent name
Returns:
List of memory entries in chronological order
"""
with self._lock:
if agent_name:
return [e for e in self._history if e.agent_name == agent_name]
return list(self._history)
def has_key(self, key: str) -> bool:
"""Check if a key exists in memory."""
with self._lock:
return key in self._storage
def clear(self) -> None:
"""Clear all stored data and history."""
with self._lock:
self._storage.clear()
self._history.clear()
def keys(self) -> list[str]:
"""Get all stored keys."""
with self._lock:
return list(self._storage.keys())