File size: 8,201 Bytes
61247fd
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
"""
IPFS Collective Memory - Experience Sharing Layer

All quine agents share experiences via IPFS:
- Each experience gets a unique CID
- Agents can query collective memory
- Merkle DAG structure for provenance
"""

import json
import hashlib
from typing import Dict, List, Optional, Any
from dataclasses import dataclass
import time

# Try to import IPFS client, fallback to mock if not available
try:
    import ipfshttpclient
    IPFS_AVAILABLE = True
except ImportError:
    IPFS_AVAILABLE = False


@dataclass
class IPFSExperience:
    """An experience stored in IPFS."""
    cid: str
    agent_id: str
    merkle_hash: str
    data: Dict
    timestamp: float
    parent_cid: Optional[str] = None


class CollectiveMemory:
    """
    Shared experience pool using IPFS.
    
    In production: Uses actual IPFS node
    In demo: Uses local mock storage
    """
    
    def __init__(self, use_ipfs: bool = True, ipfs_addr: str = "/ip4/127.0.0.1/tcp/5001"):
        self.use_ipfs = use_ipfs and IPFS_AVAILABLE
        self.ipfs_addr = ipfs_addr
        self.client = None
        
        # Local storage (always available)
        self.local_store: Dict[str, IPFSExperience] = {}
        self.agent_index: Dict[str, List[str]] = {}  # agent_id -> [cids]
        self.merkle_index: Dict[str, str] = {}  # merkle_hash -> cid
        
        if self.use_ipfs:
            try:
                self.client = ipfshttpclient.connect(ipfs_addr)
                print(f"[IPFS] Connected to {ipfs_addr}")
            except Exception as e:
                print(f"[IPFS] Connection failed: {e}, using local mock")
                self.use_ipfs = False
    
    def upload_experience(self, agent_id: str, merkle_hash: str, data: Dict, 
                         parent_cid: Optional[str] = None) -> str:
        """
        Upload an experience to collective memory.
        
        Returns: CID (content identifier)
        """
        # Prepare the data
        exp_data = {
            'agent_id': agent_id,
            'merkle_hash': merkle_hash,
            'data': data,
            'timestamp': time.time(),
            'parent_cid': parent_cid
        }
        
        if self.use_ipfs and self.client:
            # Real IPFS upload
            try:
                result = self.client.add_json(exp_data)
                cid = result
            except Exception as e:
                print(f"[IPFS] Upload failed: {e}")
                cid = self._mock_cid(exp_data)
        else:
            # Mock CID generation
            cid = self._mock_cid(exp_data)
        
        # Store locally
        exp = IPFSExperience(
            cid=cid,
            agent_id=agent_id,
            merkle_hash=merkle_hash,
            data=data,
            timestamp=exp_data['timestamp'],
            parent_cid=parent_cid
        )
        self.local_store[cid] = exp
        
        # Update indices
        if agent_id not in self.agent_index:
            self.agent_index[agent_id] = []
        self.agent_index[agent_id].append(cid)
        self.merkle_index[merkle_hash] = cid
        
        return cid
    
    def _mock_cid(self, data: Dict) -> str:
        """Generate a mock CID (when IPFS not available)."""
        # Simulate IPFS CIDv1 format
        content = json.dumps(data, sort_keys=True)
        hash_bytes = hashlib.sha256(content.encode()).digest()
        # Return as base32-like string starting with "bafy" (mock)
        return "Qm" + hashlib.sha256(hash_bytes).hexdigest()[:44]
    
    def get_experience(self, cid: str) -> Optional[IPFSExperience]:
        """Retrieve an experience by CID."""
        # Check local first
        if cid in self.local_store:
            return self.local_store[cid]
        
        # Try IPFS
        if self.use_ipfs and self.client:
            try:
                data = self.client.get_json(cid)
                exp = IPFSExperience(
                    cid=cid,
                    agent_id=data['agent_id'],
                    merkle_hash=data['merkle_hash'],
                    data=data['data'],
                    timestamp=data['timestamp'],
                    parent_cid=data.get('parent_cid')
                )
                self.local_store[cid] = exp
                return exp
            except Exception as e:
                print(f"[IPFS] Get failed for {cid}: {e}")
        
        return None
    
    def get_agent_experiences(self, agent_id: str) -> List[IPFSExperience]:
        """Get all experiences for an agent."""
        cids = self.agent_index.get(agent_id, [])
        return [self.local_store[cid] for cid in cids if cid in self.local_store]
    
    def query_by_merkle(self, merkle_hash: str) -> Optional[IPFSExperience]:
        """Find experience by its Merkle hash."""
        cid = self.merkle_index.get(merkle_hash)
        if cid:
            return self.get_experience(cid)
        return None
    
    def get_recent(self, n: int = 20) -> List[IPFSExperience]:
        """Get n most recent experiences."""
        sorted_exps = sorted(
            self.local_store.values(),
            key=lambda e: e.timestamp,
            reverse=True
        )
        return sorted_exps[:n]
    
    def get_stats(self) -> Dict:
        """Get collective memory statistics."""
        return {
            'total_experiences': len(self.local_store),
            'unique_agents': len(self.agent_index),
            'ipfs_connected': self.use_ipfs and self.client is not None,
            'storage_mode': 'ipfs' if self.use_ipfs else 'local'
        }
    
    def get_dag_structure(self) -> Dict[str, List[str]]:
        """Get the parent-child DAG structure."""
        dag = {}
        for cid, exp in self.local_store.items():
            if exp.parent_cid:
                if exp.parent_cid not in dag:
                    dag[exp.parent_cid] = []
                dag[exp.parent_cid].append(cid)
        return dag
    
    def export_chain(self, start_cid: str) -> List[Dict]:
        """Export a full experience chain starting from a CID."""
        chain = []
        current_cid = start_cid
        visited = set()
        
        while current_cid and current_cid not in visited:
            visited.add(current_cid)
            exp = self.get_experience(current_cid)
            if exp:
                chain.append({
                    'cid': exp.cid,
                    'agent_id': exp.agent_id,
                    'merkle_hash': exp.merkle_hash,
                    'timestamp': exp.timestamp,
                    'parent_cid': exp.parent_cid
                })
                # Move to parent
                current_cid = exp.parent_cid
            else:
                break
        
        return chain[::-1]  # Reverse to get chronological order


class SyncManager:
    """
    Coordinates syncing between local agents and collective memory.
    """
    
    def __init__(self, memory: CollectiveMemory):
        self.memory = memory
        self.pending_uploads: List[Dict] = []
        self.last_sync = time.time()
    
    def queue_experience(self, agent_id: str, merkle_hash: str, 
                        data: Dict, parent_cid: Optional[str] = None):
        """Queue an experience for upload."""
        self.pending_uploads.append({
            'agent_id': agent_id,
            'merkle_hash': merkle_hash,
            'data': data,
            'parent_cid': parent_cid
        })
    
    def sync(self) -> List[str]:
        """Upload all pending experiences. Returns list of CIDs."""
        cids = []
        for item in self.pending_uploads:
            cid = self.memory.upload_experience(
                agent_id=item['agent_id'],
                merkle_hash=item['merkle_hash'],
                data=item['data'],
                parent_cid=item['parent_cid']
            )
            cids.append(cid)
        
        self.pending_uploads = []
        self.last_sync = time.time()
        return cids
    
    def get_sync_status(self) -> Dict:
        """Get sync status."""
        return {
            'pending_uploads': len(self.pending_uploads),
            'last_sync': self.last_sync,
            'memory_stats': self.memory.get_stats()
        }