""" IPFS Collective Memory - Experience Sharing Layer All quine agents share experiences via IPFS: - Each experience gets a unique CID - Agents can query collective memory - Merkle DAG structure for provenance """ import json import hashlib from typing import Dict, List, Optional, Any from dataclasses import dataclass import time # Try to import IPFS client, fallback to mock if not available try: import ipfshttpclient IPFS_AVAILABLE = True except ImportError: IPFS_AVAILABLE = False @dataclass class IPFSExperience: """An experience stored in IPFS.""" cid: str agent_id: str merkle_hash: str data: Dict timestamp: float parent_cid: Optional[str] = None class CollectiveMemory: """ Shared experience pool using IPFS. In production: Uses actual IPFS node In demo: Uses local mock storage """ def __init__(self, use_ipfs: bool = True, ipfs_addr: str = "/ip4/127.0.0.1/tcp/5001"): self.use_ipfs = use_ipfs and IPFS_AVAILABLE self.ipfs_addr = ipfs_addr self.client = None # Local storage (always available) self.local_store: Dict[str, IPFSExperience] = {} self.agent_index: Dict[str, List[str]] = {} # agent_id -> [cids] self.merkle_index: Dict[str, str] = {} # merkle_hash -> cid if self.use_ipfs: try: self.client = ipfshttpclient.connect(ipfs_addr) print(f"[IPFS] Connected to {ipfs_addr}") except Exception as e: print(f"[IPFS] Connection failed: {e}, using local mock") self.use_ipfs = False def upload_experience(self, agent_id: str, merkle_hash: str, data: Dict, parent_cid: Optional[str] = None) -> str: """ Upload an experience to collective memory. Returns: CID (content identifier) """ # Prepare the data exp_data = { 'agent_id': agent_id, 'merkle_hash': merkle_hash, 'data': data, 'timestamp': time.time(), 'parent_cid': parent_cid } if self.use_ipfs and self.client: # Real IPFS upload try: result = self.client.add_json(exp_data) cid = result except Exception as e: print(f"[IPFS] Upload failed: {e}") cid = self._mock_cid(exp_data) else: # Mock CID generation cid = self._mock_cid(exp_data) # Store locally exp = IPFSExperience( cid=cid, agent_id=agent_id, merkle_hash=merkle_hash, data=data, timestamp=exp_data['timestamp'], parent_cid=parent_cid ) self.local_store[cid] = exp # Update indices if agent_id not in self.agent_index: self.agent_index[agent_id] = [] self.agent_index[agent_id].append(cid) self.merkle_index[merkle_hash] = cid return cid def _mock_cid(self, data: Dict) -> str: """Generate a mock CID (when IPFS not available).""" # Simulate IPFS CIDv1 format content = json.dumps(data, sort_keys=True) hash_bytes = hashlib.sha256(content.encode()).digest() # Return as base32-like string starting with "bafy" (mock) return "Qm" + hashlib.sha256(hash_bytes).hexdigest()[:44] def get_experience(self, cid: str) -> Optional[IPFSExperience]: """Retrieve an experience by CID.""" # Check local first if cid in self.local_store: return self.local_store[cid] # Try IPFS if self.use_ipfs and self.client: try: data = self.client.get_json(cid) exp = IPFSExperience( cid=cid, agent_id=data['agent_id'], merkle_hash=data['merkle_hash'], data=data['data'], timestamp=data['timestamp'], parent_cid=data.get('parent_cid') ) self.local_store[cid] = exp return exp except Exception as e: print(f"[IPFS] Get failed for {cid}: {e}") return None def get_agent_experiences(self, agent_id: str) -> List[IPFSExperience]: """Get all experiences for an agent.""" cids = self.agent_index.get(agent_id, []) return [self.local_store[cid] for cid in cids if cid in self.local_store] def query_by_merkle(self, merkle_hash: str) -> Optional[IPFSExperience]: """Find experience by its Merkle hash.""" cid = self.merkle_index.get(merkle_hash) if cid: return self.get_experience(cid) return None def get_recent(self, n: int = 20) -> List[IPFSExperience]: """Get n most recent experiences.""" sorted_exps = sorted( self.local_store.values(), key=lambda e: e.timestamp, reverse=True ) return sorted_exps[:n] def get_stats(self) -> Dict: """Get collective memory statistics.""" return { 'total_experiences': len(self.local_store), 'unique_agents': len(self.agent_index), 'ipfs_connected': self.use_ipfs and self.client is not None, 'storage_mode': 'ipfs' if self.use_ipfs else 'local' } def get_dag_structure(self) -> Dict[str, List[str]]: """Get the parent-child DAG structure.""" dag = {} for cid, exp in self.local_store.items(): if exp.parent_cid: if exp.parent_cid not in dag: dag[exp.parent_cid] = [] dag[exp.parent_cid].append(cid) return dag def export_chain(self, start_cid: str) -> List[Dict]: """Export a full experience chain starting from a CID.""" chain = [] current_cid = start_cid visited = set() while current_cid and current_cid not in visited: visited.add(current_cid) exp = self.get_experience(current_cid) if exp: chain.append({ 'cid': exp.cid, 'agent_id': exp.agent_id, 'merkle_hash': exp.merkle_hash, 'timestamp': exp.timestamp, 'parent_cid': exp.parent_cid }) # Move to parent current_cid = exp.parent_cid else: break return chain[::-1] # Reverse to get chronological order class SyncManager: """ Coordinates syncing between local agents and collective memory. """ def __init__(self, memory: CollectiveMemory): self.memory = memory self.pending_uploads: List[Dict] = [] self.last_sync = time.time() def queue_experience(self, agent_id: str, merkle_hash: str, data: Dict, parent_cid: Optional[str] = None): """Queue an experience for upload.""" self.pending_uploads.append({ 'agent_id': agent_id, 'merkle_hash': merkle_hash, 'data': data, 'parent_cid': parent_cid }) def sync(self) -> List[str]: """Upload all pending experiences. Returns list of CIDs.""" cids = [] for item in self.pending_uploads: cid = self.memory.upload_experience( agent_id=item['agent_id'], merkle_hash=item['merkle_hash'], data=item['data'], parent_cid=item['parent_cid'] ) cids.append(cid) self.pending_uploads = [] self.last_sync = time.time() return cids def get_sync_status(self) -> Dict: """Get sync status.""" return { 'pending_uploads': len(self.pending_uploads), 'last_sync': self.last_sync, 'memory_stats': self.memory.get_stats() }