Spaces:
Sleeping
Sleeping
| """ | |
| IPFS Collective Memory - Experience Sharing Layer | |
| All quine agents share experiences via IPFS: | |
| - Each experience gets a unique CID | |
| - Agents can query collective memory | |
| - Merkle DAG structure for provenance | |
| """ | |
| import json | |
| import hashlib | |
| from typing import Dict, List, Optional, Any | |
| from dataclasses import dataclass | |
| import time | |
| # Try to import IPFS client, fallback to mock if not available | |
| try: | |
| import ipfshttpclient | |
| IPFS_AVAILABLE = True | |
| except ImportError: | |
| IPFS_AVAILABLE = False | |
| class IPFSExperience: | |
| """An experience stored in IPFS.""" | |
| cid: str | |
| agent_id: str | |
| merkle_hash: str | |
| data: Dict | |
| timestamp: float | |
| parent_cid: Optional[str] = None | |
| class CollectiveMemory: | |
| """ | |
| Shared experience pool using IPFS. | |
| In production: Uses actual IPFS node | |
| In demo: Uses local mock storage | |
| """ | |
| def __init__(self, use_ipfs: bool = True, ipfs_addr: str = "/ip4/127.0.0.1/tcp/5001"): | |
| self.use_ipfs = use_ipfs and IPFS_AVAILABLE | |
| self.ipfs_addr = ipfs_addr | |
| self.client = None | |
| # Local storage (always available) | |
| self.local_store: Dict[str, IPFSExperience] = {} | |
| self.agent_index: Dict[str, List[str]] = {} # agent_id -> [cids] | |
| self.merkle_index: Dict[str, str] = {} # merkle_hash -> cid | |
| if self.use_ipfs: | |
| try: | |
| self.client = ipfshttpclient.connect(ipfs_addr) | |
| print(f"[IPFS] Connected to {ipfs_addr}") | |
| except Exception as e: | |
| print(f"[IPFS] Connection failed: {e}, using local mock") | |
| self.use_ipfs = False | |
| def upload_experience(self, agent_id: str, merkle_hash: str, data: Dict, | |
| parent_cid: Optional[str] = None) -> str: | |
| """ | |
| Upload an experience to collective memory. | |
| Returns: CID (content identifier) | |
| """ | |
| # Prepare the data | |
| exp_data = { | |
| 'agent_id': agent_id, | |
| 'merkle_hash': merkle_hash, | |
| 'data': data, | |
| 'timestamp': time.time(), | |
| 'parent_cid': parent_cid | |
| } | |
| if self.use_ipfs and self.client: | |
| # Real IPFS upload | |
| try: | |
| result = self.client.add_json(exp_data) | |
| cid = result | |
| except Exception as e: | |
| print(f"[IPFS] Upload failed: {e}") | |
| cid = self._mock_cid(exp_data) | |
| else: | |
| # Mock CID generation | |
| cid = self._mock_cid(exp_data) | |
| # Store locally | |
| exp = IPFSExperience( | |
| cid=cid, | |
| agent_id=agent_id, | |
| merkle_hash=merkle_hash, | |
| data=data, | |
| timestamp=exp_data['timestamp'], | |
| parent_cid=parent_cid | |
| ) | |
| self.local_store[cid] = exp | |
| # Update indices | |
| if agent_id not in self.agent_index: | |
| self.agent_index[agent_id] = [] | |
| self.agent_index[agent_id].append(cid) | |
| self.merkle_index[merkle_hash] = cid | |
| return cid | |
| def _mock_cid(self, data: Dict) -> str: | |
| """Generate a mock CID (when IPFS not available).""" | |
| # Simulate IPFS CIDv1 format | |
| content = json.dumps(data, sort_keys=True) | |
| hash_bytes = hashlib.sha256(content.encode()).digest() | |
| # Return as base32-like string starting with "bafy" (mock) | |
| return "Qm" + hashlib.sha256(hash_bytes).hexdigest()[:44] | |
| def get_experience(self, cid: str) -> Optional[IPFSExperience]: | |
| """Retrieve an experience by CID.""" | |
| # Check local first | |
| if cid in self.local_store: | |
| return self.local_store[cid] | |
| # Try IPFS | |
| if self.use_ipfs and self.client: | |
| try: | |
| data = self.client.get_json(cid) | |
| exp = IPFSExperience( | |
| cid=cid, | |
| agent_id=data['agent_id'], | |
| merkle_hash=data['merkle_hash'], | |
| data=data['data'], | |
| timestamp=data['timestamp'], | |
| parent_cid=data.get('parent_cid') | |
| ) | |
| self.local_store[cid] = exp | |
| return exp | |
| except Exception as e: | |
| print(f"[IPFS] Get failed for {cid}: {e}") | |
| return None | |
| def get_agent_experiences(self, agent_id: str) -> List[IPFSExperience]: | |
| """Get all experiences for an agent.""" | |
| cids = self.agent_index.get(agent_id, []) | |
| return [self.local_store[cid] for cid in cids if cid in self.local_store] | |
| def query_by_merkle(self, merkle_hash: str) -> Optional[IPFSExperience]: | |
| """Find experience by its Merkle hash.""" | |
| cid = self.merkle_index.get(merkle_hash) | |
| if cid: | |
| return self.get_experience(cid) | |
| return None | |
| def get_recent(self, n: int = 20) -> List[IPFSExperience]: | |
| """Get n most recent experiences.""" | |
| sorted_exps = sorted( | |
| self.local_store.values(), | |
| key=lambda e: e.timestamp, | |
| reverse=True | |
| ) | |
| return sorted_exps[:n] | |
| def get_stats(self) -> Dict: | |
| """Get collective memory statistics.""" | |
| return { | |
| 'total_experiences': len(self.local_store), | |
| 'unique_agents': len(self.agent_index), | |
| 'ipfs_connected': self.use_ipfs and self.client is not None, | |
| 'storage_mode': 'ipfs' if self.use_ipfs else 'local' | |
| } | |
| def get_dag_structure(self) -> Dict[str, List[str]]: | |
| """Get the parent-child DAG structure.""" | |
| dag = {} | |
| for cid, exp in self.local_store.items(): | |
| if exp.parent_cid: | |
| if exp.parent_cid not in dag: | |
| dag[exp.parent_cid] = [] | |
| dag[exp.parent_cid].append(cid) | |
| return dag | |
| def export_chain(self, start_cid: str) -> List[Dict]: | |
| """Export a full experience chain starting from a CID.""" | |
| chain = [] | |
| current_cid = start_cid | |
| visited = set() | |
| while current_cid and current_cid not in visited: | |
| visited.add(current_cid) | |
| exp = self.get_experience(current_cid) | |
| if exp: | |
| chain.append({ | |
| 'cid': exp.cid, | |
| 'agent_id': exp.agent_id, | |
| 'merkle_hash': exp.merkle_hash, | |
| 'timestamp': exp.timestamp, | |
| 'parent_cid': exp.parent_cid | |
| }) | |
| # Move to parent | |
| current_cid = exp.parent_cid | |
| else: | |
| break | |
| return chain[::-1] # Reverse to get chronological order | |
| class SyncManager: | |
| """ | |
| Coordinates syncing between local agents and collective memory. | |
| """ | |
| def __init__(self, memory: CollectiveMemory): | |
| self.memory = memory | |
| self.pending_uploads: List[Dict] = [] | |
| self.last_sync = time.time() | |
| def queue_experience(self, agent_id: str, merkle_hash: str, | |
| data: Dict, parent_cid: Optional[str] = None): | |
| """Queue an experience for upload.""" | |
| self.pending_uploads.append({ | |
| 'agent_id': agent_id, | |
| 'merkle_hash': merkle_hash, | |
| 'data': data, | |
| 'parent_cid': parent_cid | |
| }) | |
| def sync(self) -> List[str]: | |
| """Upload all pending experiences. Returns list of CIDs.""" | |
| cids = [] | |
| for item in self.pending_uploads: | |
| cid = self.memory.upload_experience( | |
| agent_id=item['agent_id'], | |
| merkle_hash=item['merkle_hash'], | |
| data=item['data'], | |
| parent_cid=item['parent_cid'] | |
| ) | |
| cids.append(cid) | |
| self.pending_uploads = [] | |
| self.last_sync = time.time() | |
| return cids | |
| def get_sync_status(self) -> Dict: | |
| """Get sync status.""" | |
| return { | |
| 'pending_uploads': len(self.pending_uploads), | |
| 'last_sync': self.last_sync, | |
| 'memory_stats': self.memory.get_stats() | |
| } | |