File size: 4,231 Bytes
9281fab
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
"""
Shared memory buffer for inter-agent communication in CoDA.

Provides thread-safe storage for agents to exchange context,
results, and feedback during the visualization pipeline.
"""

import threading
from datetime import datetime
from typing import Any, Optional

from pydantic import BaseModel, Field


class MemoryEntry(BaseModel):
    """A single entry in the shared memory."""
    
    key: str
    value: Any
    agent_name: str
    timestamp: datetime = Field(default_factory=datetime.now)
    metadata: dict[str, Any] = Field(default_factory=dict)


class SharedMemory:
    """
    Thread-safe shared memory buffer for agent communication.
    
    Agents can store and retrieve structured data using string keys.
    Each entry tracks the source agent and timestamp for debugging.
    """
    
    def __init__(self) -> None:
        self._storage: dict[str, MemoryEntry] = {}
        self._lock = threading.RLock()
        self._history: list[MemoryEntry] = []
    
    def store(
        self,
        key: str,
        value: Any,
        agent_name: str,
        metadata: Optional[dict[str, Any]] = None,
    ) -> None:
        """
        Store a value in shared memory.
        
        Args:
            key: Unique identifier for the data
            value: The data to store (should be JSON-serializable)
            agent_name: Name of the agent storing the data
            metadata: Optional additional context
        """
        entry = MemoryEntry(
            key=key,
            value=value,
            agent_name=agent_name,
            metadata=metadata or {},
        )
        
        with self._lock:
            self._storage[key] = entry
            self._history.append(entry)
    
    def retrieve(self, key: str) -> Optional[Any]:
        """
        Retrieve a value from shared memory.
        
        Args:
            key: The key to look up
            
        Returns:
            The stored value, or None if not found
        """
        with self._lock:
            entry = self._storage.get(key)
            return entry.value if entry else None
    
    def retrieve_entry(self, key: str) -> Optional[MemoryEntry]:
        """
        Retrieve the full memory entry including metadata.
        
        Args:
            key: The key to look up
            
        Returns:
            The full MemoryEntry, or None if not found
        """
        with self._lock:
            return self._storage.get(key)
    
    def get_context(self, keys: list[str]) -> dict[str, Any]:
        """
        Retrieve multiple values as a context dictionary.
        
        Args:
            keys: List of keys to retrieve
            
        Returns:
            Dictionary mapping keys to their values (missing keys excluded)
        """
        with self._lock:
            return {
                key: self._storage[key].value
                for key in keys
                if key in self._storage
            }
    
    def get_all(self) -> dict[str, Any]:
        """
        Retrieve all stored values.
        
        Returns:
            Dictionary mapping all keys to their values
        """
        with self._lock:
            return {key: entry.value for key, entry in self._storage.items()}
    
    def get_history(self, agent_name: Optional[str] = None) -> list[MemoryEntry]:
        """
        Get the history of all memory operations.
        
        Args:
            agent_name: Optional filter by agent name
            
        Returns:
            List of memory entries in chronological order
        """
        with self._lock:
            if agent_name:
                return [e for e in self._history if e.agent_name == agent_name]
            return list(self._history)
    
    def has_key(self, key: str) -> bool:
        """Check if a key exists in memory."""
        with self._lock:
            return key in self._storage
    
    def clear(self) -> None:
        """Clear all stored data and history."""
        with self._lock:
            self._storage.clear()
            self._history.clear()
    
    def keys(self) -> list[str]:
        """Get all stored keys."""
        with self._lock:
            return list(self._storage.keys())