Spaces:
Configuration error
Configuration error
| """ | |
| CASCADE Store - Simple observe/query interface with HuggingFace | |
| The goal: make it as easy as possible to store and retrieve provenance. | |
| from cascade.store import observe, query | |
| # Write - saves locally + syncs to HuggingFace | |
| receipt = observe(model_id="crafter", data={"reward": 2.1, "step": 100}) | |
| print(receipt.cid) # bafyrei... (content hash for verification) | |
| # Read - queries local store + HuggingFace | |
| past = query(model_id="crafter") | |
| for obs in past: | |
| print(obs["reward"], obs["cid"]) | |
| Architecture: | |
| 1. Local SQLite index (fast queries) | |
| 2. CBOR files for full data (content-addressed) | |
| 3. HuggingFace datasets for sync (unlimited, free) | |
| 4. CIDs computed for verification (no IPFS daemon needed) | |
| Dual-write model: | |
| - User's own HF dataset (they own their data) | |
| - Central cascade-observations dataset (Dreamer sees everything) | |
| No daemon. No server. Just `huggingface-cli login` once. | |
| """ | |
| import hashlib | |
| import json | |
| import os | |
| import sqlite3 | |
| import time | |
| from dataclasses import dataclass, field | |
| from pathlib import Path | |
| from typing import Any, Dict, List, Optional | |
| import dag_cbor | |
| from multiformats import CID, multihash | |
| from cascade.genesis import get_genesis_root | |
| # ============================================================================= | |
| # CONSTANTS | |
| # ============================================================================= | |
| # HuggingFace datasets - UNLIMITED FREE STORAGE | |
| CENTRAL_DATASET = "tostido/cascade-observations" # Dreamer reads this | |
| USER_DATASET = os.environ.get("CASCADE_USER_DATASET") # Optional: user's own dataset | |
| # Default lattice directory (local cache) | |
| DEFAULT_LATTICE_DIR = Path.home() / ".cascade" / "lattice" | |
| # IPFS gateways for fallback | |
| IPFS_GATEWAYS = [ | |
| "https://ipfs.io/ipfs/", | |
| "https://dweb.link/ipfs/", | |
| "https://gateway.pinata.cloud/ipfs/", | |
| ] | |
| # ============================================================================= | |
| # DATA STRUCTURES | |
| # ============================================================================= | |
| class Receipt: | |
| """ | |
| A provenance receipt - proof that an observation happened. | |
| Content-addressed via CID. Can be verified by anyone. | |
| """ | |
| cid: str # IPFS content identifier | |
| model_id: str # What model/system was observed | |
| merkle_root: str # Chain hash (legacy compat) | |
| timestamp: float # When | |
| data: Dict[str, Any] # The observation data | |
| parent_cid: Optional[str] = None # Links to previous observation | |
| def to_dict(self) -> Dict[str, Any]: | |
| return { | |
| "cid": self.cid, | |
| "model_id": self.model_id, | |
| "merkle_root": self.merkle_root, | |
| "timestamp": self.timestamp, | |
| "data": self.data, | |
| "parent_cid": self.parent_cid, | |
| } | |
| def from_dict(cls, d: Dict[str, Any]) -> "Receipt": | |
| return cls( | |
| cid=d["cid"], | |
| model_id=d["model_id"], | |
| merkle_root=d["merkle_root"], | |
| timestamp=d["timestamp"], | |
| data=d["data"], | |
| parent_cid=d.get("parent_cid"), | |
| ) | |
| # ============================================================================= | |
| # CONTENT ADDRESSING | |
| # ============================================================================= | |
| def compute_cid(data: bytes) -> str: | |
| """ | |
| Compute CIDv1 (base32) from bytes. | |
| CID = multicodec(dag-cbor) + multihash(sha256(data)) | |
| """ | |
| digest = hashlib.sha256(data).digest() | |
| mh = multihash.wrap(digest, "sha2-256") | |
| cid = CID("base32", 1, "dag-cbor", mh) | |
| return str(cid) | |
| def data_to_cid(data: Dict[str, Any]) -> tuple[str, bytes]: | |
| """Convert dict to (CID, encoded bytes).""" | |
| encoded = dag_cbor.encode(data) | |
| cid = compute_cid(encoded) | |
| return cid, encoded | |
| # ============================================================================= | |
| # LOCAL STORE | |
| # ============================================================================= | |
| class LocalStore: | |
| """ | |
| SQLite-backed local store with CBOR files. | |
| Fast queries via SQLite index. | |
| Full data in content-addressed CBOR files. | |
| """ | |
| def __init__(self, lattice_dir: Path = None): | |
| self.lattice_dir = lattice_dir or DEFAULT_LATTICE_DIR | |
| self.lattice_dir.mkdir(parents=True, exist_ok=True) | |
| self.cbor_dir = self.lattice_dir / "cbor" | |
| self.cbor_dir.mkdir(exist_ok=True) | |
| self.db_path = self.lattice_dir / "index.db" | |
| self._init_db() | |
| def _init_db(self): | |
| """Initialize SQLite schema.""" | |
| conn = sqlite3.connect(self.db_path) | |
| conn.execute(""" | |
| CREATE TABLE IF NOT EXISTS observations ( | |
| cid TEXT PRIMARY KEY, | |
| model_id TEXT NOT NULL, | |
| merkle_root TEXT NOT NULL, | |
| timestamp REAL NOT NULL, | |
| parent_cid TEXT, | |
| pinned INTEGER DEFAULT 0, | |
| created_at REAL DEFAULT (strftime('%s', 'now')) | |
| ) | |
| """) | |
| conn.execute(""" | |
| CREATE INDEX IF NOT EXISTS idx_model_id ON observations(model_id) | |
| """) | |
| conn.execute(""" | |
| CREATE INDEX IF NOT EXISTS idx_timestamp ON observations(timestamp DESC) | |
| """) | |
| conn.commit() | |
| conn.close() | |
| def save(self, receipt: Receipt) -> str: | |
| """Save receipt to local store. Returns CID.""" | |
| # Save CBOR file | |
| data = receipt.to_dict() | |
| cid, encoded = data_to_cid(data) | |
| cbor_path = self.cbor_dir / f"{cid}.cbor" | |
| cbor_path.write_bytes(encoded) | |
| # Update receipt with computed CID (may differ if data changed) | |
| receipt.cid = cid | |
| # Index in SQLite | |
| conn = sqlite3.connect(self.db_path) | |
| conn.execute(""" | |
| INSERT OR REPLACE INTO observations | |
| (cid, model_id, merkle_root, timestamp, parent_cid) | |
| VALUES (?, ?, ?, ?, ?) | |
| """, (cid, receipt.model_id, receipt.merkle_root, | |
| receipt.timestamp, receipt.parent_cid)) | |
| conn.commit() | |
| conn.close() | |
| return cid | |
| def get(self, cid: str) -> Optional[Receipt]: | |
| """Get receipt by CID.""" | |
| cbor_path = self.cbor_dir / f"{cid}.cbor" | |
| if not cbor_path.exists(): | |
| return None | |
| data = dag_cbor.decode(cbor_path.read_bytes()) | |
| return Receipt.from_dict(data) | |
| def query( | |
| self, | |
| model_id: Optional[str] = None, | |
| since: Optional[float] = None, | |
| limit: int = 100, | |
| ) -> List[Receipt]: | |
| """Query receipts with filters.""" | |
| conn = sqlite3.connect(self.db_path) | |
| sql = "SELECT cid FROM observations WHERE 1=1" | |
| params = [] | |
| if model_id: | |
| sql += " AND model_id = ?" | |
| params.append(model_id) | |
| if since: | |
| sql += " AND timestamp > ?" | |
| params.append(since) | |
| sql += " ORDER BY timestamp DESC LIMIT ?" | |
| params.append(limit) | |
| cursor = conn.execute(sql, params) | |
| cids = [row[0] for row in cursor.fetchall()] | |
| conn.close() | |
| # Load full receipts | |
| receipts = [] | |
| for cid in cids: | |
| receipt = self.get(cid) | |
| if receipt: | |
| receipts.append(receipt) | |
| return receipts | |
| def get_latest(self, model_id: str) -> Optional[Receipt]: | |
| """Get most recent receipt for a model.""" | |
| results = self.query(model_id=model_id, limit=1) | |
| return results[0] if results else None | |
| def count(self, model_id: Optional[str] = None) -> int: | |
| """Count observations.""" | |
| conn = sqlite3.connect(self.db_path) | |
| if model_id: | |
| cursor = conn.execute( | |
| "SELECT COUNT(*) FROM observations WHERE model_id = ?", | |
| (model_id,) | |
| ) | |
| else: | |
| cursor = conn.execute("SELECT COUNT(*) FROM observations") | |
| count = cursor.fetchone()[0] | |
| conn.close() | |
| return count | |
| # ============================================================================= | |
| # IPFS GATEWAY ACCESS | |
| # ============================================================================= | |
| def fetch_from_gateway(cid: str, timeout: float = 10.0) -> Optional[bytes]: | |
| """ | |
| Fetch data from public IPFS gateways. | |
| Tries multiple gateways in sequence. | |
| No daemon needed. | |
| """ | |
| import requests | |
| for gateway in IPFS_GATEWAYS: | |
| url = f"{gateway}{cid}" | |
| try: | |
| resp = requests.get(url, timeout=timeout) | |
| if resp.status_code == 200: | |
| return resp.content | |
| except Exception: | |
| continue | |
| return None | |
| def fetch_receipt(cid: str, local_store: LocalStore = None) -> Optional[Receipt]: | |
| """ | |
| Fetch receipt by CID, checking local store first, then IPFS. | |
| """ | |
| # Check local first | |
| if local_store: | |
| receipt = local_store.get(cid) | |
| if receipt: | |
| return receipt | |
| # Try IPFS gateways | |
| data = fetch_from_gateway(cid) | |
| if data: | |
| try: | |
| decoded = dag_cbor.decode(data) | |
| receipt = Receipt.from_dict(decoded) | |
| # Cache locally | |
| if local_store: | |
| local_store.save(receipt) | |
| return receipt | |
| except Exception: | |
| pass | |
| return None | |
| # ============================================================================= | |
| # HUGGINGFACE SYNC - Unlimited free storage | |
| # ============================================================================= | |
| # (Uses constants from top of file: CENTRAL_DATASET, USER_DATASET) | |
| def _upload_to_hf(filepath: Path, cid: str, dataset_id: str) -> bool: | |
| """Upload a single observation to a HuggingFace dataset.""" | |
| try: | |
| from huggingface_hub import HfApi | |
| api = HfApi() | |
| api.upload_file( | |
| path_or_fileobj=str(filepath), | |
| path_in_repo=f"observations/{cid}.cbor", | |
| repo_id=dataset_id, | |
| repo_type="dataset", | |
| ) | |
| return True | |
| except Exception: | |
| return False | |
| def sync_observation(cid: str, filepath: Path) -> dict: | |
| """ | |
| Sync a single observation to HuggingFace. | |
| Dual-write: | |
| 1. Central dataset (jtwspace/cascade-observations) - Dreamer sees all | |
| 2. User's dataset (if CASCADE_USER_DATASET set) - they own their data | |
| """ | |
| results = {"central": False, "user": False} | |
| # Always sync to central | |
| results["central"] = _upload_to_hf(filepath, cid, CENTRAL_DATASET) | |
| # Optionally sync to user's dataset | |
| if USER_DATASET: | |
| results["user"] = _upload_to_hf(filepath, cid, USER_DATASET) | |
| return results | |
| def sync_all() -> dict: | |
| """ | |
| Sync all local observations to HuggingFace. | |
| Returns: | |
| {"synced": count, "failed": count} | |
| """ | |
| store = _get_store() | |
| cbor_files = list(store.cbor_dir.glob("*.cbor")) | |
| synced = 0 | |
| failed = 0 | |
| for cbor_path in cbor_files: | |
| cid = cbor_path.stem | |
| results = sync_observation(cid, cbor_path) | |
| if results["central"]: | |
| synced += 1 | |
| conn = sqlite3.connect(store.db_path) | |
| conn.execute("UPDATE observations SET pinned = 1 WHERE cid = ?", (cid,)) | |
| conn.commit() | |
| conn.close() | |
| else: | |
| failed += 1 | |
| return {"synced": synced, "failed": failed} | |
| def pull_from_hf(dataset_id: str = None) -> int: | |
| """ | |
| Pull observations from a HuggingFace dataset. | |
| Args: | |
| dataset_id: HF dataset (default: central dataset) | |
| Returns: | |
| Number of observations pulled | |
| """ | |
| try: | |
| from huggingface_hub import HfApi, hf_hub_download, list_repo_files | |
| except ImportError: | |
| print("pip install huggingface_hub") | |
| return 0 | |
| store = _get_store() | |
| dataset_id = dataset_id or CENTRAL_DATASET | |
| # List observation files | |
| try: | |
| files = list_repo_files(dataset_id, repo_type="dataset") | |
| obs_files = [f for f in files if f.startswith("observations/") and f.endswith(".cbor")] | |
| except Exception as e: | |
| print(f"Could not list dataset: {e}") | |
| return 0 | |
| pulled = 0 | |
| for file_path in obs_files: | |
| cid = file_path.replace("observations/", "").replace(".cbor", "") | |
| local_path = store.cbor_dir / f"{cid}.cbor" | |
| if local_path.exists(): | |
| continue | |
| try: | |
| downloaded = hf_hub_download( | |
| repo_id=dataset_id, | |
| filename=file_path, | |
| repo_type="dataset", | |
| local_dir=str(store.lattice_dir / "_hf_cache"), | |
| ) | |
| import shutil | |
| shutil.copy(downloaded, local_path) | |
| data = dag_cbor.decode(local_path.read_bytes()) | |
| receipt = Receipt.from_dict(data) | |
| conn = sqlite3.connect(store.db_path) | |
| conn.execute(""" | |
| INSERT OR REPLACE INTO observations | |
| (cid, model_id, merkle_root, timestamp, parent_cid, pinned) | |
| VALUES (?, ?, ?, ?, ?, 1) | |
| """, (cid, receipt.model_id, receipt.merkle_root, | |
| receipt.timestamp, receipt.parent_cid)) | |
| conn.commit() | |
| conn.close() | |
| pulled += 1 | |
| except Exception as e: | |
| print(f"Pull error for {cid[:16]}: {e}") | |
| return pulled | |
| # ============================================================================= | |
| # PUBLIC API | |
| # ============================================================================= | |
| # Global store instance | |
| _store: Optional[LocalStore] = None | |
| def _get_store() -> LocalStore: | |
| """Get or create global store.""" | |
| global _store | |
| if _store is None: | |
| _store = LocalStore() | |
| return _store | |
| def _json_default(obj): | |
| """Handle non-serializable types like numpy scalars.""" | |
| if hasattr(obj, "item") and callable(obj.item): | |
| return obj.item() | |
| return str(obj) | |
| def _sanitize(data: Any) -> Any: | |
| """Recursively convert numpy types to python types.""" | |
| if isinstance(data, dict): | |
| return {k: _sanitize(v) for k, v in data.items()} | |
| elif isinstance(data, (list, tuple)): | |
| return [_sanitize(x) for x in data] | |
| elif hasattr(data, "item") and callable(data.item): | |
| return data.item() | |
| return data | |
| def observe( | |
| model_id: str, | |
| data: Dict[str, Any], | |
| parent_cid: Optional[str] = None, | |
| sync: bool = True, | |
| ) -> Receipt: | |
| """ | |
| Record an observation. | |
| Args: | |
| model_id: Identifier for the model/system (e.g., "crafter", "gpt-4") | |
| data: Observation data (any JSON-serializable dict) | |
| parent_cid: Optional CID of previous observation (for chains) | |
| sync: Whether to sync to HuggingFace (default: True) | |
| Returns: | |
| Receipt with CID | |
| Example: | |
| receipt = observe("crafter", {"reward": 2.1, "step": 100}) | |
| print(receipt.cid) # bafyrei... | |
| """ | |
| store = _get_store() | |
| # Sanitize data to ensure JSON/CBOR serializability (numpy types -> python) | |
| data = _sanitize(data) | |
| # Get parent if chaining | |
| if parent_cid is None: | |
| latest = store.get_latest(model_id) | |
| if latest: | |
| parent_cid = latest.cid | |
| # Compute merkle root (legacy compat) | |
| # Using default=_json_default to handle numpy types often found in AI systems | |
| merkle_data = f"{model_id}:{json.dumps(data, sort_keys=True, default=_json_default)}:{time.time()}" | |
| merkle_root = hashlib.sha256(merkle_data.encode()).hexdigest()[:16] | |
| # Link to genesis | |
| genesis_root = get_genesis_root() | |
| data["_genesis"] = genesis_root | |
| data["_model_id"] = model_id | |
| # Create receipt | |
| receipt = Receipt( | |
| cid="", # Will be computed | |
| model_id=model_id, | |
| merkle_root=merkle_root, | |
| timestamp=time.time(), | |
| data=data, | |
| parent_cid=parent_cid, | |
| ) | |
| # Save locally (computes CID) | |
| cid = store.save(receipt) | |
| receipt.cid = cid | |
| # Auto-sync to HuggingFace (best-effort, non-blocking) | |
| if sync: | |
| try: | |
| sync_observation(cid, store.cbor_dir / f"{cid}.cbor") | |
| except Exception: | |
| pass # Local save succeeded, that's what matters | |
| return receipt | |
| def query( | |
| model_id: Optional[str] = None, | |
| since: Optional[float] = None, | |
| limit: int = 100, | |
| include_remote: bool = False, | |
| ) -> List[Receipt]: | |
| """ | |
| Query observations. | |
| Args: | |
| model_id: Filter by model ID | |
| since: Only observations after this timestamp | |
| limit: Maximum results | |
| include_remote: Also search IPFS gateways (slower) | |
| Returns: | |
| List of receipts | |
| Example: | |
| # Get all crafter observations | |
| for receipt in query(model_id="crafter"): | |
| print(receipt.data["reward"]) | |
| """ | |
| store = _get_store() | |
| return store.query(model_id=model_id, since=since, limit=limit) | |
| def get(cid: str) -> Optional[Receipt]: | |
| """ | |
| Get a specific observation by CID. | |
| Checks local store first, then IPFS gateways. | |
| """ | |
| store = _get_store() | |
| return fetch_receipt(cid, store) | |
| # ============================================================================= | |
| # DISCOVERY - Leverages HuggingFace's catalog | |
| # ============================================================================= | |
| def discover_models(dataset_id: str = None) -> Dict[str, int]: | |
| """ | |
| Discover all model_ids in the lattice by scanning HuggingFace. | |
| HuggingFace IS the catalog - we just read it. | |
| Returns: | |
| Dict mapping model_id -> observation count | |
| """ | |
| dataset_id = dataset_id or CENTRAL_DATASET | |
| try: | |
| from huggingface_hub import list_repo_files | |
| files = list_repo_files(dataset_id, repo_type="dataset") | |
| # Parse model_ids from observation files | |
| # Files are stored as: observations/{cid}.cbor | |
| # We need to actually read to get model_id, OR | |
| # use a manifest/index file | |
| # For now, return local knowledge + count remote files | |
| store = _get_store() | |
| local_models = {} | |
| conn = sqlite3.connect(store.db_path) | |
| rows = conn.execute( | |
| "SELECT model_id, COUNT(*) FROM observations GROUP BY model_id" | |
| ).fetchall() | |
| conn.close() | |
| for model_id, count in rows: | |
| local_models[model_id] = count | |
| # Count total remote files | |
| remote_count = len([f for f in files if f.startswith("observations/") and f.endswith(".cbor")]) | |
| return { | |
| "local_models": local_models, | |
| "remote_observation_count": remote_count, | |
| "dataset": dataset_id, | |
| } | |
| except ImportError: | |
| # No huggingface_hub - return local only | |
| store = _get_store() | |
| conn = sqlite3.connect(store.db_path) | |
| rows = conn.execute( | |
| "SELECT model_id, COUNT(*) FROM observations GROUP BY model_id" | |
| ).fetchall() | |
| conn.close() | |
| return {"local_models": {m: c for m, c in rows}, "remote": "unavailable"} | |
| except Exception as e: | |
| return {"error": str(e)} | |
| def discover_datasets(query_str: str = "cascade") -> List[Dict[str, Any]]: | |
| """ | |
| Search HuggingFace for cascade-related datasets. | |
| This is how users FIND each other's lattices. | |
| Returns: | |
| List of dataset info dicts | |
| """ | |
| try: | |
| from huggingface_hub import HfApi | |
| api = HfApi() | |
| # Search for cascade datasets | |
| datasets = api.list_datasets(search=query_str, limit=50) | |
| results = [] | |
| for ds in datasets: | |
| results.append({ | |
| "id": ds.id, | |
| "author": ds.author, | |
| "downloads": ds.downloads, | |
| "likes": ds.likes, | |
| "last_modified": str(ds.last_modified) if ds.last_modified else None, | |
| "tags": ds.tags if hasattr(ds, 'tags') else [], | |
| }) | |
| return results | |
| except ImportError: | |
| return [{"error": "huggingface_hub not installed"}] | |
| except Exception as e: | |
| return [{"error": str(e)}] | |
| def discover_live(dataset_id: str = None) -> Dict[str, Any]: | |
| """ | |
| Get LIVE activity on a dataset - who's observing right now? | |
| Uses HuggingFace's commit history as activity feed. | |
| Returns: | |
| Recent commits/updates to the dataset | |
| """ | |
| dataset_id = dataset_id or CENTRAL_DATASET | |
| try: | |
| from huggingface_hub import HfApi | |
| api = HfApi() | |
| # Get recent commits | |
| commits = api.list_repo_commits( | |
| repo_id=dataset_id, | |
| repo_type="dataset", | |
| ) | |
| # Take last 20 | |
| recent = [] | |
| for i, commit in enumerate(commits): | |
| if i >= 20: | |
| break | |
| recent.append({ | |
| "commit_id": commit.commit_id, | |
| "created_at": str(commit.created_at), | |
| "title": commit.title, | |
| "authors": [a for a in (commit.authors or [])], | |
| }) | |
| return { | |
| "dataset": dataset_id, | |
| "recent_commits": recent, | |
| "total_commits": len(list(commits)) if hasattr(commits, '__len__') else "unknown", | |
| } | |
| except ImportError: | |
| return {"error": "huggingface_hub not installed"} | |
| except Exception as e: | |
| return {"error": str(e)} | |
| def dataset_info(dataset_id: str = None) -> Dict[str, Any]: | |
| """ | |
| Get full metadata for a dataset from HuggingFace. | |
| Returns: | |
| Dataset metadata including size, downloads, etc. | |
| """ | |
| dataset_id = dataset_id or CENTRAL_DATASET | |
| try: | |
| from huggingface_hub import HfApi | |
| api = HfApi() | |
| info = api.dataset_info(dataset_id) | |
| return { | |
| "id": info.id, | |
| "author": info.author, | |
| "downloads": info.downloads, | |
| "likes": info.likes, | |
| "created_at": str(info.created_at) if info.created_at else None, | |
| "last_modified": str(info.last_modified) if info.last_modified else None, | |
| "private": info.private, | |
| "tags": info.tags, | |
| "card_data": info.card_data if hasattr(info, 'card_data') else None, | |
| } | |
| except ImportError: | |
| return {"error": "huggingface_hub not installed"} | |
| except Exception as e: | |
| return {"error": str(e)} | |
| def stats() -> Dict[str, Any]: | |
| """Get store statistics.""" | |
| store = _get_store() | |
| conn = sqlite3.connect(store.db_path) | |
| total = conn.execute("SELECT COUNT(*) FROM observations").fetchone()[0] | |
| pinned = conn.execute("SELECT COUNT(*) FROM observations WHERE pinned = 1").fetchone()[0] | |
| models = conn.execute( | |
| "SELECT model_id, COUNT(*) FROM observations GROUP BY model_id" | |
| ).fetchall() | |
| conn.close() | |
| return { | |
| "total_observations": total, | |
| "pinned_observations": pinned, | |
| "models": {m: c for m, c in models}, | |
| "genesis_root": get_genesis_root(), | |
| "lattice_dir": str(_get_store().lattice_dir), | |
| } | |
| # ============================================================================= | |
| # CLI | |
| # ============================================================================= | |
| if __name__ == "__main__": | |
| import argparse | |
| parser = argparse.ArgumentParser(description="CASCADE Store") | |
| parser.add_argument("command", choices=["stats", "query", "get", "push", "pull", "sync"]) | |
| parser.add_argument("--model", help="Model ID filter") | |
| parser.add_argument("--cid", help="CID to fetch") | |
| parser.add_argument("--limit", type=int, default=10) | |
| parser.add_argument("--dataset", help="HF dataset (default: jtwspace/cascade-observations)") | |
| args = parser.parse_args() | |
| if args.command == "stats": | |
| s = stats() | |
| print(f"Total: {s['total_observations']}") | |
| print(f"Synced: {s['pinned_observations']}") | |
| print(f"Genesis: {s['genesis_root']}") | |
| print(f"Central dataset: {CENTRAL_DATASET}") | |
| if USER_DATASET: | |
| print(f"User dataset: {USER_DATASET}") | |
| print(f"Models:") | |
| for model, count in s["models"].items(): | |
| print(f" {model}: {count}") | |
| elif args.command == "query": | |
| receipts = query(model_id=args.model, limit=args.limit) | |
| for r in receipts: | |
| print(f"{r.cid[:20]}... | {r.model_id} | {r.data}") | |
| elif args.command == "get": | |
| if not args.cid: | |
| print("--cid required") | |
| else: | |
| r = get(args.cid) | |
| if r: | |
| print(json.dumps(r.to_dict(), indent=2, default=str)) | |
| else: | |
| print("Not found") | |
| elif args.command == "push": | |
| print(f"Pushing to HuggingFace: {CENTRAL_DATASET}") | |
| result = sync_all() | |
| print(f"Synced {result['synced']}, Failed {result['failed']}") | |
| elif args.command == "pull": | |
| dataset = args.dataset or CENTRAL_DATASET | |
| print(f"Pulling from: {dataset}") | |
| count = pull_from_hf(dataset) | |
| print(f"Pulled {count} observations") | |
| elif args.command == "sync": | |
| # Bidirectional sync | |
| print(f"Syncing with HuggingFace...") | |
| pulled = pull_from_hf() | |
| result = sync_all() | |
| print(f"Pulled {pulled}, Pushed {result['synced']}") | |