Cascade / cascade /store.py
tostido's picture
Initial commit - cascade-lattice 0.5.4
77bcbf1
"""
CASCADE Store - Simple observe/query interface with HuggingFace
The goal: make it as easy as possible to store and retrieve provenance.
from cascade.store import observe, query
# Write - saves locally + syncs to HuggingFace
receipt = observe(model_id="crafter", data={"reward": 2.1, "step": 100})
print(receipt.cid) # bafyrei... (content hash for verification)
# Read - queries local store + HuggingFace
past = query(model_id="crafter")
for obs in past:
print(obs["reward"], obs["cid"])
Architecture:
1. Local SQLite index (fast queries)
2. CBOR files for full data (content-addressed)
3. HuggingFace datasets for sync (unlimited, free)
4. CIDs computed for verification (no IPFS daemon needed)
Dual-write model:
- User's own HF dataset (they own their data)
- Central cascade-observations dataset (Dreamer sees everything)
No daemon. No server. Just `huggingface-cli login` once.
"""
import hashlib
import json
import os
import sqlite3
import time
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any, Dict, List, Optional
import dag_cbor
from multiformats import CID, multihash
from cascade.genesis import get_genesis_root
# =============================================================================
# CONSTANTS
# =============================================================================
# HuggingFace datasets - UNLIMITED FREE STORAGE
CENTRAL_DATASET = "tostido/cascade-observations" # Dreamer reads this
USER_DATASET = os.environ.get("CASCADE_USER_DATASET") # Optional: user's own dataset
# Default lattice directory (local cache)
DEFAULT_LATTICE_DIR = Path.home() / ".cascade" / "lattice"
# IPFS gateways for fallback
IPFS_GATEWAYS = [
"https://ipfs.io/ipfs/",
"https://dweb.link/ipfs/",
"https://gateway.pinata.cloud/ipfs/",
]
# =============================================================================
# DATA STRUCTURES
# =============================================================================
@dataclass
class Receipt:
"""
A provenance receipt - proof that an observation happened.
Content-addressed via CID. Can be verified by anyone.
"""
cid: str # IPFS content identifier
model_id: str # What model/system was observed
merkle_root: str # Chain hash (legacy compat)
timestamp: float # When
data: Dict[str, Any] # The observation data
parent_cid: Optional[str] = None # Links to previous observation
def to_dict(self) -> Dict[str, Any]:
return {
"cid": self.cid,
"model_id": self.model_id,
"merkle_root": self.merkle_root,
"timestamp": self.timestamp,
"data": self.data,
"parent_cid": self.parent_cid,
}
@classmethod
def from_dict(cls, d: Dict[str, Any]) -> "Receipt":
return cls(
cid=d["cid"],
model_id=d["model_id"],
merkle_root=d["merkle_root"],
timestamp=d["timestamp"],
data=d["data"],
parent_cid=d.get("parent_cid"),
)
# =============================================================================
# CONTENT ADDRESSING
# =============================================================================
def compute_cid(data: bytes) -> str:
"""
Compute CIDv1 (base32) from bytes.
CID = multicodec(dag-cbor) + multihash(sha256(data))
"""
digest = hashlib.sha256(data).digest()
mh = multihash.wrap(digest, "sha2-256")
cid = CID("base32", 1, "dag-cbor", mh)
return str(cid)
def data_to_cid(data: Dict[str, Any]) -> tuple[str, bytes]:
"""Convert dict to (CID, encoded bytes)."""
encoded = dag_cbor.encode(data)
cid = compute_cid(encoded)
return cid, encoded
# =============================================================================
# LOCAL STORE
# =============================================================================
class LocalStore:
"""
SQLite-backed local store with CBOR files.
Fast queries via SQLite index.
Full data in content-addressed CBOR files.
"""
def __init__(self, lattice_dir: Path = None):
self.lattice_dir = lattice_dir or DEFAULT_LATTICE_DIR
self.lattice_dir.mkdir(parents=True, exist_ok=True)
self.cbor_dir = self.lattice_dir / "cbor"
self.cbor_dir.mkdir(exist_ok=True)
self.db_path = self.lattice_dir / "index.db"
self._init_db()
def _init_db(self):
"""Initialize SQLite schema."""
conn = sqlite3.connect(self.db_path)
conn.execute("""
CREATE TABLE IF NOT EXISTS observations (
cid TEXT PRIMARY KEY,
model_id TEXT NOT NULL,
merkle_root TEXT NOT NULL,
timestamp REAL NOT NULL,
parent_cid TEXT,
pinned INTEGER DEFAULT 0,
created_at REAL DEFAULT (strftime('%s', 'now'))
)
""")
conn.execute("""
CREATE INDEX IF NOT EXISTS idx_model_id ON observations(model_id)
""")
conn.execute("""
CREATE INDEX IF NOT EXISTS idx_timestamp ON observations(timestamp DESC)
""")
conn.commit()
conn.close()
def save(self, receipt: Receipt) -> str:
"""Save receipt to local store. Returns CID."""
# Save CBOR file
data = receipt.to_dict()
cid, encoded = data_to_cid(data)
cbor_path = self.cbor_dir / f"{cid}.cbor"
cbor_path.write_bytes(encoded)
# Update receipt with computed CID (may differ if data changed)
receipt.cid = cid
# Index in SQLite
conn = sqlite3.connect(self.db_path)
conn.execute("""
INSERT OR REPLACE INTO observations
(cid, model_id, merkle_root, timestamp, parent_cid)
VALUES (?, ?, ?, ?, ?)
""", (cid, receipt.model_id, receipt.merkle_root,
receipt.timestamp, receipt.parent_cid))
conn.commit()
conn.close()
return cid
def get(self, cid: str) -> Optional[Receipt]:
"""Get receipt by CID."""
cbor_path = self.cbor_dir / f"{cid}.cbor"
if not cbor_path.exists():
return None
data = dag_cbor.decode(cbor_path.read_bytes())
return Receipt.from_dict(data)
def query(
self,
model_id: Optional[str] = None,
since: Optional[float] = None,
limit: int = 100,
) -> List[Receipt]:
"""Query receipts with filters."""
conn = sqlite3.connect(self.db_path)
sql = "SELECT cid FROM observations WHERE 1=1"
params = []
if model_id:
sql += " AND model_id = ?"
params.append(model_id)
if since:
sql += " AND timestamp > ?"
params.append(since)
sql += " ORDER BY timestamp DESC LIMIT ?"
params.append(limit)
cursor = conn.execute(sql, params)
cids = [row[0] for row in cursor.fetchall()]
conn.close()
# Load full receipts
receipts = []
for cid in cids:
receipt = self.get(cid)
if receipt:
receipts.append(receipt)
return receipts
def get_latest(self, model_id: str) -> Optional[Receipt]:
"""Get most recent receipt for a model."""
results = self.query(model_id=model_id, limit=1)
return results[0] if results else None
def count(self, model_id: Optional[str] = None) -> int:
"""Count observations."""
conn = sqlite3.connect(self.db_path)
if model_id:
cursor = conn.execute(
"SELECT COUNT(*) FROM observations WHERE model_id = ?",
(model_id,)
)
else:
cursor = conn.execute("SELECT COUNT(*) FROM observations")
count = cursor.fetchone()[0]
conn.close()
return count
# =============================================================================
# IPFS GATEWAY ACCESS
# =============================================================================
def fetch_from_gateway(cid: str, timeout: float = 10.0) -> Optional[bytes]:
"""
Fetch data from public IPFS gateways.
Tries multiple gateways in sequence.
No daemon needed.
"""
import requests
for gateway in IPFS_GATEWAYS:
url = f"{gateway}{cid}"
try:
resp = requests.get(url, timeout=timeout)
if resp.status_code == 200:
return resp.content
except Exception:
continue
return None
def fetch_receipt(cid: str, local_store: LocalStore = None) -> Optional[Receipt]:
"""
Fetch receipt by CID, checking local store first, then IPFS.
"""
# Check local first
if local_store:
receipt = local_store.get(cid)
if receipt:
return receipt
# Try IPFS gateways
data = fetch_from_gateway(cid)
if data:
try:
decoded = dag_cbor.decode(data)
receipt = Receipt.from_dict(decoded)
# Cache locally
if local_store:
local_store.save(receipt)
return receipt
except Exception:
pass
return None
# =============================================================================
# HUGGINGFACE SYNC - Unlimited free storage
# =============================================================================
# (Uses constants from top of file: CENTRAL_DATASET, USER_DATASET)
def _upload_to_hf(filepath: Path, cid: str, dataset_id: str) -> bool:
"""Upload a single observation to a HuggingFace dataset."""
try:
from huggingface_hub import HfApi
api = HfApi()
api.upload_file(
path_or_fileobj=str(filepath),
path_in_repo=f"observations/{cid}.cbor",
repo_id=dataset_id,
repo_type="dataset",
)
return True
except Exception:
return False
def sync_observation(cid: str, filepath: Path) -> dict:
"""
Sync a single observation to HuggingFace.
Dual-write:
1. Central dataset (jtwspace/cascade-observations) - Dreamer sees all
2. User's dataset (if CASCADE_USER_DATASET set) - they own their data
"""
results = {"central": False, "user": False}
# Always sync to central
results["central"] = _upload_to_hf(filepath, cid, CENTRAL_DATASET)
# Optionally sync to user's dataset
if USER_DATASET:
results["user"] = _upload_to_hf(filepath, cid, USER_DATASET)
return results
def sync_all() -> dict:
"""
Sync all local observations to HuggingFace.
Returns:
{"synced": count, "failed": count}
"""
store = _get_store()
cbor_files = list(store.cbor_dir.glob("*.cbor"))
synced = 0
failed = 0
for cbor_path in cbor_files:
cid = cbor_path.stem
results = sync_observation(cid, cbor_path)
if results["central"]:
synced += 1
conn = sqlite3.connect(store.db_path)
conn.execute("UPDATE observations SET pinned = 1 WHERE cid = ?", (cid,))
conn.commit()
conn.close()
else:
failed += 1
return {"synced": synced, "failed": failed}
def pull_from_hf(dataset_id: str = None) -> int:
"""
Pull observations from a HuggingFace dataset.
Args:
dataset_id: HF dataset (default: central dataset)
Returns:
Number of observations pulled
"""
try:
from huggingface_hub import HfApi, hf_hub_download, list_repo_files
except ImportError:
print("pip install huggingface_hub")
return 0
store = _get_store()
dataset_id = dataset_id or CENTRAL_DATASET
# List observation files
try:
files = list_repo_files(dataset_id, repo_type="dataset")
obs_files = [f for f in files if f.startswith("observations/") and f.endswith(".cbor")]
except Exception as e:
print(f"Could not list dataset: {e}")
return 0
pulled = 0
for file_path in obs_files:
cid = file_path.replace("observations/", "").replace(".cbor", "")
local_path = store.cbor_dir / f"{cid}.cbor"
if local_path.exists():
continue
try:
downloaded = hf_hub_download(
repo_id=dataset_id,
filename=file_path,
repo_type="dataset",
local_dir=str(store.lattice_dir / "_hf_cache"),
)
import shutil
shutil.copy(downloaded, local_path)
data = dag_cbor.decode(local_path.read_bytes())
receipt = Receipt.from_dict(data)
conn = sqlite3.connect(store.db_path)
conn.execute("""
INSERT OR REPLACE INTO observations
(cid, model_id, merkle_root, timestamp, parent_cid, pinned)
VALUES (?, ?, ?, ?, ?, 1)
""", (cid, receipt.model_id, receipt.merkle_root,
receipt.timestamp, receipt.parent_cid))
conn.commit()
conn.close()
pulled += 1
except Exception as e:
print(f"Pull error for {cid[:16]}: {e}")
return pulled
# =============================================================================
# PUBLIC API
# =============================================================================
# Global store instance
_store: Optional[LocalStore] = None
def _get_store() -> LocalStore:
"""Get or create global store."""
global _store
if _store is None:
_store = LocalStore()
return _store
def _json_default(obj):
"""Handle non-serializable types like numpy scalars."""
if hasattr(obj, "item") and callable(obj.item):
return obj.item()
return str(obj)
def _sanitize(data: Any) -> Any:
"""Recursively convert numpy types to python types."""
if isinstance(data, dict):
return {k: _sanitize(v) for k, v in data.items()}
elif isinstance(data, (list, tuple)):
return [_sanitize(x) for x in data]
elif hasattr(data, "item") and callable(data.item):
return data.item()
return data
def observe(
model_id: str,
data: Dict[str, Any],
parent_cid: Optional[str] = None,
sync: bool = True,
) -> Receipt:
"""
Record an observation.
Args:
model_id: Identifier for the model/system (e.g., "crafter", "gpt-4")
data: Observation data (any JSON-serializable dict)
parent_cid: Optional CID of previous observation (for chains)
sync: Whether to sync to HuggingFace (default: True)
Returns:
Receipt with CID
Example:
receipt = observe("crafter", {"reward": 2.1, "step": 100})
print(receipt.cid) # bafyrei...
"""
store = _get_store()
# Sanitize data to ensure JSON/CBOR serializability (numpy types -> python)
data = _sanitize(data)
# Get parent if chaining
if parent_cid is None:
latest = store.get_latest(model_id)
if latest:
parent_cid = latest.cid
# Compute merkle root (legacy compat)
# Using default=_json_default to handle numpy types often found in AI systems
merkle_data = f"{model_id}:{json.dumps(data, sort_keys=True, default=_json_default)}:{time.time()}"
merkle_root = hashlib.sha256(merkle_data.encode()).hexdigest()[:16]
# Link to genesis
genesis_root = get_genesis_root()
data["_genesis"] = genesis_root
data["_model_id"] = model_id
# Create receipt
receipt = Receipt(
cid="", # Will be computed
model_id=model_id,
merkle_root=merkle_root,
timestamp=time.time(),
data=data,
parent_cid=parent_cid,
)
# Save locally (computes CID)
cid = store.save(receipt)
receipt.cid = cid
# Auto-sync to HuggingFace (best-effort, non-blocking)
if sync:
try:
sync_observation(cid, store.cbor_dir / f"{cid}.cbor")
except Exception:
pass # Local save succeeded, that's what matters
return receipt
def query(
model_id: Optional[str] = None,
since: Optional[float] = None,
limit: int = 100,
include_remote: bool = False,
) -> List[Receipt]:
"""
Query observations.
Args:
model_id: Filter by model ID
since: Only observations after this timestamp
limit: Maximum results
include_remote: Also search IPFS gateways (slower)
Returns:
List of receipts
Example:
# Get all crafter observations
for receipt in query(model_id="crafter"):
print(receipt.data["reward"])
"""
store = _get_store()
return store.query(model_id=model_id, since=since, limit=limit)
def get(cid: str) -> Optional[Receipt]:
"""
Get a specific observation by CID.
Checks local store first, then IPFS gateways.
"""
store = _get_store()
return fetch_receipt(cid, store)
# =============================================================================
# DISCOVERY - Leverages HuggingFace's catalog
# =============================================================================
def discover_models(dataset_id: str = None) -> Dict[str, int]:
"""
Discover all model_ids in the lattice by scanning HuggingFace.
HuggingFace IS the catalog - we just read it.
Returns:
Dict mapping model_id -> observation count
"""
dataset_id = dataset_id or CENTRAL_DATASET
try:
from huggingface_hub import list_repo_files
files = list_repo_files(dataset_id, repo_type="dataset")
# Parse model_ids from observation files
# Files are stored as: observations/{cid}.cbor
# We need to actually read to get model_id, OR
# use a manifest/index file
# For now, return local knowledge + count remote files
store = _get_store()
local_models = {}
conn = sqlite3.connect(store.db_path)
rows = conn.execute(
"SELECT model_id, COUNT(*) FROM observations GROUP BY model_id"
).fetchall()
conn.close()
for model_id, count in rows:
local_models[model_id] = count
# Count total remote files
remote_count = len([f for f in files if f.startswith("observations/") and f.endswith(".cbor")])
return {
"local_models": local_models,
"remote_observation_count": remote_count,
"dataset": dataset_id,
}
except ImportError:
# No huggingface_hub - return local only
store = _get_store()
conn = sqlite3.connect(store.db_path)
rows = conn.execute(
"SELECT model_id, COUNT(*) FROM observations GROUP BY model_id"
).fetchall()
conn.close()
return {"local_models": {m: c for m, c in rows}, "remote": "unavailable"}
except Exception as e:
return {"error": str(e)}
def discover_datasets(query_str: str = "cascade") -> List[Dict[str, Any]]:
"""
Search HuggingFace for cascade-related datasets.
This is how users FIND each other's lattices.
Returns:
List of dataset info dicts
"""
try:
from huggingface_hub import HfApi
api = HfApi()
# Search for cascade datasets
datasets = api.list_datasets(search=query_str, limit=50)
results = []
for ds in datasets:
results.append({
"id": ds.id,
"author": ds.author,
"downloads": ds.downloads,
"likes": ds.likes,
"last_modified": str(ds.last_modified) if ds.last_modified else None,
"tags": ds.tags if hasattr(ds, 'tags') else [],
})
return results
except ImportError:
return [{"error": "huggingface_hub not installed"}]
except Exception as e:
return [{"error": str(e)}]
def discover_live(dataset_id: str = None) -> Dict[str, Any]:
"""
Get LIVE activity on a dataset - who's observing right now?
Uses HuggingFace's commit history as activity feed.
Returns:
Recent commits/updates to the dataset
"""
dataset_id = dataset_id or CENTRAL_DATASET
try:
from huggingface_hub import HfApi
api = HfApi()
# Get recent commits
commits = api.list_repo_commits(
repo_id=dataset_id,
repo_type="dataset",
)
# Take last 20
recent = []
for i, commit in enumerate(commits):
if i >= 20:
break
recent.append({
"commit_id": commit.commit_id,
"created_at": str(commit.created_at),
"title": commit.title,
"authors": [a for a in (commit.authors or [])],
})
return {
"dataset": dataset_id,
"recent_commits": recent,
"total_commits": len(list(commits)) if hasattr(commits, '__len__') else "unknown",
}
except ImportError:
return {"error": "huggingface_hub not installed"}
except Exception as e:
return {"error": str(e)}
def dataset_info(dataset_id: str = None) -> Dict[str, Any]:
"""
Get full metadata for a dataset from HuggingFace.
Returns:
Dataset metadata including size, downloads, etc.
"""
dataset_id = dataset_id or CENTRAL_DATASET
try:
from huggingface_hub import HfApi
api = HfApi()
info = api.dataset_info(dataset_id)
return {
"id": info.id,
"author": info.author,
"downloads": info.downloads,
"likes": info.likes,
"created_at": str(info.created_at) if info.created_at else None,
"last_modified": str(info.last_modified) if info.last_modified else None,
"private": info.private,
"tags": info.tags,
"card_data": info.card_data if hasattr(info, 'card_data') else None,
}
except ImportError:
return {"error": "huggingface_hub not installed"}
except Exception as e:
return {"error": str(e)}
def stats() -> Dict[str, Any]:
"""Get store statistics."""
store = _get_store()
conn = sqlite3.connect(store.db_path)
total = conn.execute("SELECT COUNT(*) FROM observations").fetchone()[0]
pinned = conn.execute("SELECT COUNT(*) FROM observations WHERE pinned = 1").fetchone()[0]
models = conn.execute(
"SELECT model_id, COUNT(*) FROM observations GROUP BY model_id"
).fetchall()
conn.close()
return {
"total_observations": total,
"pinned_observations": pinned,
"models": {m: c for m, c in models},
"genesis_root": get_genesis_root(),
"lattice_dir": str(_get_store().lattice_dir),
}
# =============================================================================
# CLI
# =============================================================================
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser(description="CASCADE Store")
parser.add_argument("command", choices=["stats", "query", "get", "push", "pull", "sync"])
parser.add_argument("--model", help="Model ID filter")
parser.add_argument("--cid", help="CID to fetch")
parser.add_argument("--limit", type=int, default=10)
parser.add_argument("--dataset", help="HF dataset (default: jtwspace/cascade-observations)")
args = parser.parse_args()
if args.command == "stats":
s = stats()
print(f"Total: {s['total_observations']}")
print(f"Synced: {s['pinned_observations']}")
print(f"Genesis: {s['genesis_root']}")
print(f"Central dataset: {CENTRAL_DATASET}")
if USER_DATASET:
print(f"User dataset: {USER_DATASET}")
print(f"Models:")
for model, count in s["models"].items():
print(f" {model}: {count}")
elif args.command == "query":
receipts = query(model_id=args.model, limit=args.limit)
for r in receipts:
print(f"{r.cid[:20]}... | {r.model_id} | {r.data}")
elif args.command == "get":
if not args.cid:
print("--cid required")
else:
r = get(args.cid)
if r:
print(json.dumps(r.to_dict(), indent=2, default=str))
else:
print("Not found")
elif args.command == "push":
print(f"Pushing to HuggingFace: {CENTRAL_DATASET}")
result = sync_all()
print(f"Synced {result['synced']}, Failed {result['failed']}")
elif args.command == "pull":
dataset = args.dataset or CENTRAL_DATASET
print(f"Pulling from: {dataset}")
count = pull_from_hf(dataset)
print(f"Pulled {count} observations")
elif args.command == "sync":
# Bidirectional sync
print(f"Syncing with HuggingFace...")
pulled = pull_from_hf()
result = sync_all()
print(f"Pulled {pulled}, Pushed {result['synced']}")