tostido's picture
Initial commit: CASCADE Hyperlattice mobile-friendly interactive 3D visualization
61247fd
"""
IPFS Collective Memory - Experience Sharing Layer
All quine agents share experiences via IPFS:
- Each experience gets a unique CID
- Agents can query collective memory
- Merkle DAG structure for provenance
"""
import json
import hashlib
from typing import Dict, List, Optional, Any
from dataclasses import dataclass
import time
# Try to import IPFS client, fallback to mock if not available
try:
import ipfshttpclient
IPFS_AVAILABLE = True
except ImportError:
IPFS_AVAILABLE = False
@dataclass
class IPFSExperience:
"""An experience stored in IPFS."""
cid: str
agent_id: str
merkle_hash: str
data: Dict
timestamp: float
parent_cid: Optional[str] = None
class CollectiveMemory:
"""
Shared experience pool using IPFS.
In production: Uses actual IPFS node
In demo: Uses local mock storage
"""
def __init__(self, use_ipfs: bool = True, ipfs_addr: str = "/ip4/127.0.0.1/tcp/5001"):
self.use_ipfs = use_ipfs and IPFS_AVAILABLE
self.ipfs_addr = ipfs_addr
self.client = None
# Local storage (always available)
self.local_store: Dict[str, IPFSExperience] = {}
self.agent_index: Dict[str, List[str]] = {} # agent_id -> [cids]
self.merkle_index: Dict[str, str] = {} # merkle_hash -> cid
if self.use_ipfs:
try:
self.client = ipfshttpclient.connect(ipfs_addr)
print(f"[IPFS] Connected to {ipfs_addr}")
except Exception as e:
print(f"[IPFS] Connection failed: {e}, using local mock")
self.use_ipfs = False
def upload_experience(self, agent_id: str, merkle_hash: str, data: Dict,
parent_cid: Optional[str] = None) -> str:
"""
Upload an experience to collective memory.
Returns: CID (content identifier)
"""
# Prepare the data
exp_data = {
'agent_id': agent_id,
'merkle_hash': merkle_hash,
'data': data,
'timestamp': time.time(),
'parent_cid': parent_cid
}
if self.use_ipfs and self.client:
# Real IPFS upload
try:
result = self.client.add_json(exp_data)
cid = result
except Exception as e:
print(f"[IPFS] Upload failed: {e}")
cid = self._mock_cid(exp_data)
else:
# Mock CID generation
cid = self._mock_cid(exp_data)
# Store locally
exp = IPFSExperience(
cid=cid,
agent_id=agent_id,
merkle_hash=merkle_hash,
data=data,
timestamp=exp_data['timestamp'],
parent_cid=parent_cid
)
self.local_store[cid] = exp
# Update indices
if agent_id not in self.agent_index:
self.agent_index[agent_id] = []
self.agent_index[agent_id].append(cid)
self.merkle_index[merkle_hash] = cid
return cid
def _mock_cid(self, data: Dict) -> str:
"""Generate a mock CID (when IPFS not available)."""
# Simulate IPFS CIDv1 format
content = json.dumps(data, sort_keys=True)
hash_bytes = hashlib.sha256(content.encode()).digest()
# Return as base32-like string starting with "bafy" (mock)
return "Qm" + hashlib.sha256(hash_bytes).hexdigest()[:44]
def get_experience(self, cid: str) -> Optional[IPFSExperience]:
"""Retrieve an experience by CID."""
# Check local first
if cid in self.local_store:
return self.local_store[cid]
# Try IPFS
if self.use_ipfs and self.client:
try:
data = self.client.get_json(cid)
exp = IPFSExperience(
cid=cid,
agent_id=data['agent_id'],
merkle_hash=data['merkle_hash'],
data=data['data'],
timestamp=data['timestamp'],
parent_cid=data.get('parent_cid')
)
self.local_store[cid] = exp
return exp
except Exception as e:
print(f"[IPFS] Get failed for {cid}: {e}")
return None
def get_agent_experiences(self, agent_id: str) -> List[IPFSExperience]:
"""Get all experiences for an agent."""
cids = self.agent_index.get(agent_id, [])
return [self.local_store[cid] for cid in cids if cid in self.local_store]
def query_by_merkle(self, merkle_hash: str) -> Optional[IPFSExperience]:
"""Find experience by its Merkle hash."""
cid = self.merkle_index.get(merkle_hash)
if cid:
return self.get_experience(cid)
return None
def get_recent(self, n: int = 20) -> List[IPFSExperience]:
"""Get n most recent experiences."""
sorted_exps = sorted(
self.local_store.values(),
key=lambda e: e.timestamp,
reverse=True
)
return sorted_exps[:n]
def get_stats(self) -> Dict:
"""Get collective memory statistics."""
return {
'total_experiences': len(self.local_store),
'unique_agents': len(self.agent_index),
'ipfs_connected': self.use_ipfs and self.client is not None,
'storage_mode': 'ipfs' if self.use_ipfs else 'local'
}
def get_dag_structure(self) -> Dict[str, List[str]]:
"""Get the parent-child DAG structure."""
dag = {}
for cid, exp in self.local_store.items():
if exp.parent_cid:
if exp.parent_cid not in dag:
dag[exp.parent_cid] = []
dag[exp.parent_cid].append(cid)
return dag
def export_chain(self, start_cid: str) -> List[Dict]:
"""Export a full experience chain starting from a CID."""
chain = []
current_cid = start_cid
visited = set()
while current_cid and current_cid not in visited:
visited.add(current_cid)
exp = self.get_experience(current_cid)
if exp:
chain.append({
'cid': exp.cid,
'agent_id': exp.agent_id,
'merkle_hash': exp.merkle_hash,
'timestamp': exp.timestamp,
'parent_cid': exp.parent_cid
})
# Move to parent
current_cid = exp.parent_cid
else:
break
return chain[::-1] # Reverse to get chronological order
class SyncManager:
"""
Coordinates syncing between local agents and collective memory.
"""
def __init__(self, memory: CollectiveMemory):
self.memory = memory
self.pending_uploads: List[Dict] = []
self.last_sync = time.time()
def queue_experience(self, agent_id: str, merkle_hash: str,
data: Dict, parent_cid: Optional[str] = None):
"""Queue an experience for upload."""
self.pending_uploads.append({
'agent_id': agent_id,
'merkle_hash': merkle_hash,
'data': data,
'parent_cid': parent_cid
})
def sync(self) -> List[str]:
"""Upload all pending experiences. Returns list of CIDs."""
cids = []
for item in self.pending_uploads:
cid = self.memory.upload_experience(
agent_id=item['agent_id'],
merkle_hash=item['merkle_hash'],
data=item['data'],
parent_cid=item['parent_cid']
)
cids.append(cid)
self.pending_uploads = []
self.last_sync = time.time()
return cids
def get_sync_status(self) -> Dict:
"""Get sync status."""
return {
'pending_uploads': len(self.pending_uploads),
'last_sync': self.last_sync,
'memory_stats': self.memory.get_stats()
}