Spaces:
Sleeping
Sleeping
File size: 4,231 Bytes
9281fab | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 | """
Shared memory buffer for inter-agent communication in CoDA.
Provides thread-safe storage for agents to exchange context,
results, and feedback during the visualization pipeline.
"""
import threading
from datetime import datetime
from typing import Any, Optional
from pydantic import BaseModel, Field
class MemoryEntry(BaseModel):
"""A single entry in the shared memory."""
key: str
value: Any
agent_name: str
timestamp: datetime = Field(default_factory=datetime.now)
metadata: dict[str, Any] = Field(default_factory=dict)
class SharedMemory:
"""
Thread-safe shared memory buffer for agent communication.
Agents can store and retrieve structured data using string keys.
Each entry tracks the source agent and timestamp for debugging.
"""
def __init__(self) -> None:
self._storage: dict[str, MemoryEntry] = {}
self._lock = threading.RLock()
self._history: list[MemoryEntry] = []
def store(
self,
key: str,
value: Any,
agent_name: str,
metadata: Optional[dict[str, Any]] = None,
) -> None:
"""
Store a value in shared memory.
Args:
key: Unique identifier for the data
value: The data to store (should be JSON-serializable)
agent_name: Name of the agent storing the data
metadata: Optional additional context
"""
entry = MemoryEntry(
key=key,
value=value,
agent_name=agent_name,
metadata=metadata or {},
)
with self._lock:
self._storage[key] = entry
self._history.append(entry)
def retrieve(self, key: str) -> Optional[Any]:
"""
Retrieve a value from shared memory.
Args:
key: The key to look up
Returns:
The stored value, or None if not found
"""
with self._lock:
entry = self._storage.get(key)
return entry.value if entry else None
def retrieve_entry(self, key: str) -> Optional[MemoryEntry]:
"""
Retrieve the full memory entry including metadata.
Args:
key: The key to look up
Returns:
The full MemoryEntry, or None if not found
"""
with self._lock:
return self._storage.get(key)
def get_context(self, keys: list[str]) -> dict[str, Any]:
"""
Retrieve multiple values as a context dictionary.
Args:
keys: List of keys to retrieve
Returns:
Dictionary mapping keys to their values (missing keys excluded)
"""
with self._lock:
return {
key: self._storage[key].value
for key in keys
if key in self._storage
}
def get_all(self) -> dict[str, Any]:
"""
Retrieve all stored values.
Returns:
Dictionary mapping all keys to their values
"""
with self._lock:
return {key: entry.value for key, entry in self._storage.items()}
def get_history(self, agent_name: Optional[str] = None) -> list[MemoryEntry]:
"""
Get the history of all memory operations.
Args:
agent_name: Optional filter by agent name
Returns:
List of memory entries in chronological order
"""
with self._lock:
if agent_name:
return [e for e in self._history if e.agent_name == agent_name]
return list(self._history)
def has_key(self, key: str) -> bool:
"""Check if a key exists in memory."""
with self._lock:
return key in self._storage
def clear(self) -> None:
"""Clear all stored data and history."""
with self._lock:
self._storage.clear()
self._history.clear()
def keys(self) -> list[str]:
"""Get all stored keys."""
with self._lock:
return list(self._storage.keys())
|