Spaces:
Sleeping
Sleeping
| """ | |
| Shared memory buffer for inter-agent communication in CoDA. | |
| Provides thread-safe storage for agents to exchange context, | |
| results, and feedback during the visualization pipeline. | |
| """ | |
| import threading | |
| from datetime import datetime | |
| from typing import Any, Optional | |
| from pydantic import BaseModel, Field | |
| class MemoryEntry(BaseModel): | |
| """A single entry in the shared memory.""" | |
| key: str | |
| value: Any | |
| agent_name: str | |
| timestamp: datetime = Field(default_factory=datetime.now) | |
| metadata: dict[str, Any] = Field(default_factory=dict) | |
| class SharedMemory: | |
| """ | |
| Thread-safe shared memory buffer for agent communication. | |
| Agents can store and retrieve structured data using string keys. | |
| Each entry tracks the source agent and timestamp for debugging. | |
| """ | |
| def __init__(self) -> None: | |
| self._storage: dict[str, MemoryEntry] = {} | |
| self._lock = threading.RLock() | |
| self._history: list[MemoryEntry] = [] | |
| def store( | |
| self, | |
| key: str, | |
| value: Any, | |
| agent_name: str, | |
| metadata: Optional[dict[str, Any]] = None, | |
| ) -> None: | |
| """ | |
| Store a value in shared memory. | |
| Args: | |
| key: Unique identifier for the data | |
| value: The data to store (should be JSON-serializable) | |
| agent_name: Name of the agent storing the data | |
| metadata: Optional additional context | |
| """ | |
| entry = MemoryEntry( | |
| key=key, | |
| value=value, | |
| agent_name=agent_name, | |
| metadata=metadata or {}, | |
| ) | |
| with self._lock: | |
| self._storage[key] = entry | |
| self._history.append(entry) | |
| def retrieve(self, key: str) -> Optional[Any]: | |
| """ | |
| Retrieve a value from shared memory. | |
| Args: | |
| key: The key to look up | |
| Returns: | |
| The stored value, or None if not found | |
| """ | |
| with self._lock: | |
| entry = self._storage.get(key) | |
| return entry.value if entry else None | |
| def retrieve_entry(self, key: str) -> Optional[MemoryEntry]: | |
| """ | |
| Retrieve the full memory entry including metadata. | |
| Args: | |
| key: The key to look up | |
| Returns: | |
| The full MemoryEntry, or None if not found | |
| """ | |
| with self._lock: | |
| return self._storage.get(key) | |
| def get_context(self, keys: list[str]) -> dict[str, Any]: | |
| """ | |
| Retrieve multiple values as a context dictionary. | |
| Args: | |
| keys: List of keys to retrieve | |
| Returns: | |
| Dictionary mapping keys to their values (missing keys excluded) | |
| """ | |
| with self._lock: | |
| return { | |
| key: self._storage[key].value | |
| for key in keys | |
| if key in self._storage | |
| } | |
| def get_all(self) -> dict[str, Any]: | |
| """ | |
| Retrieve all stored values. | |
| Returns: | |
| Dictionary mapping all keys to their values | |
| """ | |
| with self._lock: | |
| return {key: entry.value for key, entry in self._storage.items()} | |
| def get_history(self, agent_name: Optional[str] = None) -> list[MemoryEntry]: | |
| """ | |
| Get the history of all memory operations. | |
| Args: | |
| agent_name: Optional filter by agent name | |
| Returns: | |
| List of memory entries in chronological order | |
| """ | |
| with self._lock: | |
| if agent_name: | |
| return [e for e in self._history if e.agent_name == agent_name] | |
| return list(self._history) | |
| def has_key(self, key: str) -> bool: | |
| """Check if a key exists in memory.""" | |
| with self._lock: | |
| return key in self._storage | |
| def clear(self) -> None: | |
| """Clear all stored data and history.""" | |
| with self._lock: | |
| self._storage.clear() | |
| self._history.clear() | |
| def keys(self) -> list[str]: | |
| """Get all stored keys.""" | |
| with self._lock: | |
| return list(self._storage.keys()) | |