""" Shared memory buffer for inter-agent communication in CoDA. Provides thread-safe storage for agents to exchange context, results, and feedback during the visualization pipeline. """ import threading from datetime import datetime from typing import Any, Optional from pydantic import BaseModel, Field class MemoryEntry(BaseModel): """A single entry in the shared memory.""" key: str value: Any agent_name: str timestamp: datetime = Field(default_factory=datetime.now) metadata: dict[str, Any] = Field(default_factory=dict) class SharedMemory: """ Thread-safe shared memory buffer for agent communication. Agents can store and retrieve structured data using string keys. Each entry tracks the source agent and timestamp for debugging. """ def __init__(self) -> None: self._storage: dict[str, MemoryEntry] = {} self._lock = threading.RLock() self._history: list[MemoryEntry] = [] def store( self, key: str, value: Any, agent_name: str, metadata: Optional[dict[str, Any]] = None, ) -> None: """ Store a value in shared memory. Args: key: Unique identifier for the data value: The data to store (should be JSON-serializable) agent_name: Name of the agent storing the data metadata: Optional additional context """ entry = MemoryEntry( key=key, value=value, agent_name=agent_name, metadata=metadata or {}, ) with self._lock: self._storage[key] = entry self._history.append(entry) def retrieve(self, key: str) -> Optional[Any]: """ Retrieve a value from shared memory. Args: key: The key to look up Returns: The stored value, or None if not found """ with self._lock: entry = self._storage.get(key) return entry.value if entry else None def retrieve_entry(self, key: str) -> Optional[MemoryEntry]: """ Retrieve the full memory entry including metadata. Args: key: The key to look up Returns: The full MemoryEntry, or None if not found """ with self._lock: return self._storage.get(key) def get_context(self, keys: list[str]) -> dict[str, Any]: """ Retrieve multiple values as a context dictionary. Args: keys: List of keys to retrieve Returns: Dictionary mapping keys to their values (missing keys excluded) """ with self._lock: return { key: self._storage[key].value for key in keys if key in self._storage } def get_all(self) -> dict[str, Any]: """ Retrieve all stored values. Returns: Dictionary mapping all keys to their values """ with self._lock: return {key: entry.value for key, entry in self._storage.items()} def get_history(self, agent_name: Optional[str] = None) -> list[MemoryEntry]: """ Get the history of all memory operations. Args: agent_name: Optional filter by agent name Returns: List of memory entries in chronological order """ with self._lock: if agent_name: return [e for e in self._history if e.agent_name == agent_name] return list(self._history) def has_key(self, key: str) -> bool: """Check if a key exists in memory.""" with self._lock: return key in self._storage def clear(self) -> None: """Clear all stored data and history.""" with self._lock: self._storage.clear() self._history.clear() def keys(self) -> list[str]: """Get all stored keys.""" with self._lock: return list(self._storage.keys())