Cascade / cascade /sdk.py
tostido's picture
Initial commit - cascade-lattice 0.5.4
77bcbf1
"""
CASCADE SDK - Universal AI Observation Layer
Usage:
import cascade
cascade.init()
# Now every call emits a receipt automatically
import openai
response = openai.chat.completions.create(...) # Receipt emitted
"""
import threading
import queue
from typing import Optional, Dict, Any, List
from datetime import datetime, timezone
# Import our observation infrastructure
from .observation import ObservationManager
from .identity import ModelRegistry
from .genesis import ProvenanceChain
class CascadeSDK:
"""Main SDK singleton - manages patching and emission."""
_instance = None
_initialized = False
def __new__(cls):
if cls._instance is None:
cls._instance = super().__new__(cls)
return cls._instance
def __init__(self):
if CascadeSDK._initialized:
return
self.observation_manager = ObservationManager()
self.model_registry = ModelRegistry()
self.emission_queue = queue.Queue()
self.background_thread = None
self.running = False
self.patched_providers = set()
self.config = {
"emit_async": True,
"lattice_path": "lattice/observations",
"verbose": False,
}
CascadeSDK._initialized = True
def init(self, **kwargs):
"""
Initialize CASCADE and auto-patch available providers.
Args:
emit_async: Whether to emit receipts in background (default: True)
verbose: Print when receipts are emitted (default: False)
providers: List of providers to patch, or 'all' (default: 'all')
"""
self.config.update(kwargs)
# Start background emission thread
if self.config["emit_async"] and not self.running:
self.running = True
self.background_thread = threading.Thread(
target=self._emission_worker,
daemon=True
)
self.background_thread.start()
# Auto-patch available providers
providers = kwargs.get("providers", "all")
self._patch_providers(providers)
if self.config["verbose"]:
print(f"[CASCADE] Initialized. Patched: {self.patched_providers}")
return self
def _patch_providers(self, providers):
"""Patch LLM provider libraries."""
from .patches import (
patch_openai,
patch_anthropic,
patch_huggingface,
patch_ollama,
patch_litellm,
)
patch_map = {
"openai": patch_openai,
"anthropic": patch_anthropic,
"huggingface": patch_huggingface,
"ollama": patch_ollama,
"litellm": patch_litellm,
}
if providers == "all":
providers = list(patch_map.keys())
for provider in providers:
if provider in patch_map:
try:
patch_map[provider](self)
self.patched_providers.add(provider)
except ImportError:
# Provider not installed, skip
pass
except Exception as e:
if self.config["verbose"]:
print(f"[CASCADE] Failed to patch {provider}: {e}")
def _emission_worker(self):
"""Background thread that processes emission queue."""
while self.running:
try:
receipt_data = self.emission_queue.get(timeout=1.0)
self._emit_receipt(receipt_data)
except queue.Empty:
continue
except Exception as e:
if self.config["verbose"]:
print(f"[CASCADE] Emission error: {e}")
def _emit_receipt(self, receipt_data: Dict[str, Any]):
"""Actually write the receipt to lattice."""
import hashlib
import uuid
try:
# Create provenance chain for this observation
model_id = receipt_data["model_id"]
input_text = receipt_data["input"][:1000] # Truncate
output_text = receipt_data["output"][:2000] # Truncate
# Compute hashes
input_hash = hashlib.sha256(input_text.encode()).hexdigest()[:16]
model_hash = hashlib.sha256(model_id.encode()).hexdigest()[:16]
session_id = str(uuid.uuid4())[:8]
chain = ProvenanceChain(
session_id=session_id,
model_id=model_id,
model_hash=model_hash,
input_hash=input_hash,
)
# Add inference record
from cascade.core.provenance import ProvenanceRecord
import time
record = ProvenanceRecord(
layer_name="inference",
layer_idx=0,
state_hash=hashlib.sha256(output_text.encode()).hexdigest()[:16],
parent_hashes=[input_hash],
params_hash=model_hash,
shape=[len(output_text)],
dtype="text",
stats={
**receipt_data.get("metrics", {}),
"provider": receipt_data.get("context", {}).get("provider", "unknown"),
"timestamp": receipt_data.get("timestamp", datetime.now(timezone.utc).isoformat()),
},
execution_order=0,
)
chain.add_record(record)
chain.finalize()
observation = self.observation_manager.observe_model(
model_id=model_id,
chain=chain,
user_hash=receipt_data.get("user_hash"),
)
if self.config["verbose"]:
print(f"[CASCADE] Receipt: {observation.merkle_root[:16]}... -> {model_id}")
return observation
except Exception as e:
if self.config["verbose"]:
import traceback
print(f"[CASCADE] Failed to emit: {e}")
traceback.print_exc()
return None
def observe(
self,
model_id: str,
input_data: Any,
output_data: Any,
metrics: Optional[Dict] = None,
context: Optional[Dict] = None
):
"""
Manually emit an observation receipt.
Called automatically by patches, but can be called directly.
"""
receipt_data = {
"model_id": model_id,
"input": str(input_data),
"output": str(output_data),
"metrics": metrics or {},
"context": context or {},
"timestamp": datetime.now(timezone.utc).isoformat(),
}
if self.config["emit_async"]:
self.emission_queue.put(receipt_data)
else:
self._emit_receipt(receipt_data)
def shutdown(self):
"""Stop background emission and flush queue."""
self.running = False
if self.background_thread:
self.background_thread.join(timeout=5.0)
# Flush remaining items
while not self.emission_queue.empty():
try:
receipt_data = self.emission_queue.get_nowait()
self._emit_receipt(receipt_data)
except queue.Empty:
break
# Global SDK instance
_sdk = CascadeSDK()
def init(**kwargs):
"""Initialize CASCADE observation layer."""
return _sdk.init(**kwargs)
def observe(model_id: str, input_data: Any, output_data: Any, **kwargs):
"""Manually emit an observation."""
return _sdk.observe(model_id, input_data, output_data, **kwargs)
def shutdown():
"""Shutdown CASCADE (flush pending receipts)."""
return _sdk.shutdown()
# Convenience: allow `import cascade; cascade.init()`
__all__ = ["init", "observe", "shutdown", "CascadeSDK"]