""" CASCADE SDK - Universal AI Observation Layer Usage: import cascade cascade.init() # Now every call emits a receipt automatically import openai response = openai.chat.completions.create(...) # Receipt emitted """ import threading import queue from typing import Optional, Dict, Any, List from datetime import datetime, timezone # Import our observation infrastructure from .observation import ObservationManager from .identity import ModelRegistry from .genesis import ProvenanceChain class CascadeSDK: """Main SDK singleton - manages patching and emission.""" _instance = None _initialized = False def __new__(cls): if cls._instance is None: cls._instance = super().__new__(cls) return cls._instance def __init__(self): if CascadeSDK._initialized: return self.observation_manager = ObservationManager() self.model_registry = ModelRegistry() self.emission_queue = queue.Queue() self.background_thread = None self.running = False self.patched_providers = set() self.config = { "emit_async": True, "lattice_path": "lattice/observations", "verbose": False, } CascadeSDK._initialized = True def init(self, **kwargs): """ Initialize CASCADE and auto-patch available providers. Args: emit_async: Whether to emit receipts in background (default: True) verbose: Print when receipts are emitted (default: False) providers: List of providers to patch, or 'all' (default: 'all') """ self.config.update(kwargs) # Start background emission thread if self.config["emit_async"] and not self.running: self.running = True self.background_thread = threading.Thread( target=self._emission_worker, daemon=True ) self.background_thread.start() # Auto-patch available providers providers = kwargs.get("providers", "all") self._patch_providers(providers) if self.config["verbose"]: print(f"[CASCADE] Initialized. Patched: {self.patched_providers}") return self def _patch_providers(self, providers): """Patch LLM provider libraries.""" from .patches import ( patch_openai, patch_anthropic, patch_huggingface, patch_ollama, patch_litellm, ) patch_map = { "openai": patch_openai, "anthropic": patch_anthropic, "huggingface": patch_huggingface, "ollama": patch_ollama, "litellm": patch_litellm, } if providers == "all": providers = list(patch_map.keys()) for provider in providers: if provider in patch_map: try: patch_map[provider](self) self.patched_providers.add(provider) except ImportError: # Provider not installed, skip pass except Exception as e: if self.config["verbose"]: print(f"[CASCADE] Failed to patch {provider}: {e}") def _emission_worker(self): """Background thread that processes emission queue.""" while self.running: try: receipt_data = self.emission_queue.get(timeout=1.0) self._emit_receipt(receipt_data) except queue.Empty: continue except Exception as e: if self.config["verbose"]: print(f"[CASCADE] Emission error: {e}") def _emit_receipt(self, receipt_data: Dict[str, Any]): """Actually write the receipt to lattice.""" import hashlib import uuid try: # Create provenance chain for this observation model_id = receipt_data["model_id"] input_text = receipt_data["input"][:1000] # Truncate output_text = receipt_data["output"][:2000] # Truncate # Compute hashes input_hash = hashlib.sha256(input_text.encode()).hexdigest()[:16] model_hash = hashlib.sha256(model_id.encode()).hexdigest()[:16] session_id = str(uuid.uuid4())[:8] chain = ProvenanceChain( session_id=session_id, model_id=model_id, model_hash=model_hash, input_hash=input_hash, ) # Add inference record from cascade.core.provenance import ProvenanceRecord import time record = ProvenanceRecord( layer_name="inference", layer_idx=0, state_hash=hashlib.sha256(output_text.encode()).hexdigest()[:16], parent_hashes=[input_hash], params_hash=model_hash, shape=[len(output_text)], dtype="text", stats={ **receipt_data.get("metrics", {}), "provider": receipt_data.get("context", {}).get("provider", "unknown"), "timestamp": receipt_data.get("timestamp", datetime.now(timezone.utc).isoformat()), }, execution_order=0, ) chain.add_record(record) chain.finalize() observation = self.observation_manager.observe_model( model_id=model_id, chain=chain, user_hash=receipt_data.get("user_hash"), ) if self.config["verbose"]: print(f"[CASCADE] Receipt: {observation.merkle_root[:16]}... -> {model_id}") return observation except Exception as e: if self.config["verbose"]: import traceback print(f"[CASCADE] Failed to emit: {e}") traceback.print_exc() return None def observe( self, model_id: str, input_data: Any, output_data: Any, metrics: Optional[Dict] = None, context: Optional[Dict] = None ): """ Manually emit an observation receipt. Called automatically by patches, but can be called directly. """ receipt_data = { "model_id": model_id, "input": str(input_data), "output": str(output_data), "metrics": metrics or {}, "context": context or {}, "timestamp": datetime.now(timezone.utc).isoformat(), } if self.config["emit_async"]: self.emission_queue.put(receipt_data) else: self._emit_receipt(receipt_data) def shutdown(self): """Stop background emission and flush queue.""" self.running = False if self.background_thread: self.background_thread.join(timeout=5.0) # Flush remaining items while not self.emission_queue.empty(): try: receipt_data = self.emission_queue.get_nowait() self._emit_receipt(receipt_data) except queue.Empty: break # Global SDK instance _sdk = CascadeSDK() def init(**kwargs): """Initialize CASCADE observation layer.""" return _sdk.init(**kwargs) def observe(model_id: str, input_data: Any, output_data: Any, **kwargs): """Manually emit an observation.""" return _sdk.observe(model_id, input_data, output_data, **kwargs) def shutdown(): """Shutdown CASCADE (flush pending receipts).""" return _sdk.shutdown() # Convenience: allow `import cascade; cascade.init()` __all__ = ["init", "observe", "shutdown", "CascadeSDK"]