Cascade / cascade /export /tableau_export.py
tostido's picture
Initial commit - cascade-lattice 0.5.4
77bcbf1
"""
CASCADE → Tableau Export Pipeline
Exports Cascade data in Tableau-friendly formats:
- CSV files (universal)
- Hyper files (native Tableau, optional)
Usage:
from cascade.export import export_for_tableau
# Export all data to a directory
export_for_tableau("./tableau_data")
# Then in Tableau: Connect → Text File → select CSVs
"""
import csv
import json
import os
from pathlib import Path
from datetime import datetime
from typing import Dict, List, Any, Optional
from dataclasses import dataclass, asdict
# Try to import Hyper API (optional)
try:
from tableauhyperapi import (
HyperProcess, Telemetry, Connection, CreateMode,
TableDefinition, SqlType, TableName, Inserter
)
HAS_HYPER = True
except ImportError:
HAS_HYPER = False
@dataclass
class EventRow:
"""Flattened event for Tableau."""
event_id: str
timestamp: float
timestamp_iso: str
component: str
event_type: str
data_json: str
# Extracted common fields
loss: Optional[float] = None
accuracy: Optional[float] = None
learning_rate: Optional[float] = None
epoch: Optional[int] = None
step: Optional[int] = None
tokens: Optional[int] = None
latency_ms: Optional[float] = None
error_message: Optional[str] = None
@dataclass
class ChainRow:
"""Flattened provenance chain for Tableau."""
session_id: str
model_id: str
model_hash: str
input_hash: str
output_hash: Optional[str]
merkle_root: str
created_at: float
created_at_iso: str
record_count: int
external_links_count: int
is_verified: bool
@dataclass
class HoldEventRow:
"""Flattened HOLD event for Tableau."""
hold_id: str
timestamp: float
timestamp_iso: str
brain_id: str
state: str # PENDING, ACCEPTED, OVERRIDDEN, TIMEOUT
ai_choice: int
ai_confidence: float
final_action: int
was_override: bool
hold_duration_sec: float
value_estimate: float
action_count: int
override_source: Optional[str] = None
@dataclass
class CausationEdgeRow:
"""Flattened causation link for Tableau."""
link_id: str
from_event_id: str
to_event_id: str
causation_type: str # temporal, correlation, threshold, direct
strength: float
timestamp: float
timestamp_iso: str
@dataclass
class MetricRow:
"""Time-series metric for Tableau."""
timestamp: float
timestamp_iso: str
metric_name: str
metric_value: float
category: str # TRAINING_DYNAMICS, GRADIENT_HEALTH, etc.
component: str
is_anomaly: bool
anomaly_severity: Optional[str] = None
def _ts_to_iso(ts: float) -> str:
"""Convert Unix timestamp to ISO string."""
try:
return datetime.fromtimestamp(ts).isoformat()
except:
return ""
def _extract_metric_fields(data: Dict) -> Dict[str, Any]:
"""Extract common metric fields from event data."""
return {
"loss": data.get("loss"),
"accuracy": data.get("accuracy") or data.get("acc"),
"learning_rate": data.get("learning_rate") or data.get("lr"),
"epoch": data.get("epoch"),
"step": data.get("step") or data.get("iter"),
"tokens": data.get("tokens") or data.get("total_tokens"),
"latency_ms": data.get("latency_ms") or data.get("latency"),
"error_message": data.get("error") or data.get("message"),
}
class TableauExporter:
"""
Export Cascade data for Tableau visualization.
Creates a directory with CSV files ready for Tableau import:
- events.csv: All observed events
- chains.csv: Provenance chains
- hold_events.csv: HOLD protocol events
- causation_edges.csv: Graph edges for relationship diagrams
- metrics_timeseries.csv: Metrics over time
Example:
exporter = TableauExporter()
exporter.add_events(events)
exporter.add_chains(chains)
exporter.export("./tableau_data")
"""
def __init__(self):
self.events: List[EventRow] = []
self.chains: List[ChainRow] = []
self.hold_events: List[HoldEventRow] = []
self.causation_edges: List[CausationEdgeRow] = []
self.metrics: List[MetricRow] = []
def add_event(self, event) -> None:
"""Add a Cascade Event."""
data = event.data if hasattr(event, 'data') else {}
extracted = _extract_metric_fields(data)
row = EventRow(
event_id=event.event_id,
timestamp=event.timestamp,
timestamp_iso=_ts_to_iso(event.timestamp),
component=event.component,
event_type=event.event_type,
data_json=json.dumps(data),
**extracted
)
self.events.append(row)
def add_events(self, events) -> None:
"""Add multiple events."""
for e in events:
self.add_event(e)
def add_chain(self, chain, is_verified: bool = True) -> None:
"""Add a ProvenanceChain."""
row = ChainRow(
session_id=chain.session_id,
model_id=chain.model_id,
model_hash=chain.model_hash,
input_hash=chain.input_hash,
output_hash=chain.output_hash,
merkle_root=chain.merkle_root or "",
created_at=chain.created_at,
created_at_iso=_ts_to_iso(chain.created_at),
record_count=len(chain.records),
external_links_count=len(chain.external_roots),
is_verified=is_verified,
)
self.chains.append(row)
def add_chains(self, chains) -> None:
"""Add multiple chains."""
for c in chains:
self.add_chain(c)
def add_hold_event(self, hold_point, resolution) -> None:
"""Add a HOLD event with its resolution."""
import numpy as np
probs = hold_point.action_probs
if isinstance(probs, np.ndarray):
ai_choice = int(np.argmax(probs))
ai_confidence = float(np.max(probs))
action_count = len(probs)
else:
ai_choice = 0
ai_confidence = 0.0
action_count = 0
row = HoldEventRow(
hold_id=getattr(hold_point, 'hold_id', f"hold_{hold_point.timestamp}"),
timestamp=hold_point.timestamp if hasattr(hold_point, 'timestamp') else 0,
timestamp_iso=_ts_to_iso(hold_point.timestamp) if hasattr(hold_point, 'timestamp') else "",
brain_id=hold_point.brain_id,
state=resolution.state.value if hasattr(resolution.state, 'value') else str(resolution.state),
ai_choice=ai_choice,
ai_confidence=ai_confidence,
final_action=resolution.action,
was_override=resolution.was_override,
hold_duration_sec=resolution.hold_duration if hasattr(resolution, 'hold_duration') else 0,
value_estimate=hold_point.value,
action_count=action_count,
override_source=resolution.override_source if hasattr(resolution, 'override_source') else None,
)
self.hold_events.append(row)
def add_causation_link(self, link) -> None:
"""Add a causation graph edge."""
row = CausationEdgeRow(
link_id=link.link_id if hasattr(link, 'link_id') else f"{link.from_event}_{link.to_event}",
from_event_id=link.from_event,
to_event_id=link.to_event,
causation_type=link.causation_type,
strength=link.strength,
timestamp=link.timestamp if hasattr(link, 'timestamp') else 0,
timestamp_iso=_ts_to_iso(link.timestamp) if hasattr(link, 'timestamp') else "",
)
self.causation_edges.append(row)
def add_causation_links(self, links) -> None:
"""Add multiple causation links."""
for link in links:
self.add_causation_link(link)
def add_metric(self, name: str, value: float, timestamp: float,
category: str = "OTHER", component: str = "default",
is_anomaly: bool = False, anomaly_severity: str = None) -> None:
"""Add a time-series metric point."""
row = MetricRow(
timestamp=timestamp,
timestamp_iso=_ts_to_iso(timestamp),
metric_name=name,
metric_value=value,
category=category,
component=component,
is_anomaly=is_anomaly,
anomaly_severity=anomaly_severity,
)
self.metrics.append(row)
def add_metrics_from_event(self, event, category_map: Dict[str, str] = None) -> None:
"""Extract and add all metrics from an event."""
if category_map is None:
category_map = {
"loss": "TRAINING_DYNAMICS",
"accuracy": "TRAINING_DYNAMICS",
"lr": "TRAINING_DYNAMICS",
"learning_rate": "TRAINING_DYNAMICS",
"grad_norm": "GRADIENT_HEALTH",
"weight_norm": "WEIGHT_DYNAMICS",
"tokens": "MEMORY_COMPUTE",
"latency": "MEMORY_COMPUTE",
}
data = event.data if hasattr(event, 'data') else {}
for key, value in data.items():
if isinstance(value, (int, float)) and not isinstance(value, bool):
self.add_metric(
name=key,
value=float(value),
timestamp=event.timestamp,
category=category_map.get(key, "OTHER"),
component=event.component,
)
def _write_csv(self, path: Path, rows: List, fieldnames: List[str]) -> None:
"""Write rows to CSV."""
with open(path, 'w', newline='', encoding='utf-8') as f:
writer = csv.DictWriter(f, fieldnames=fieldnames)
writer.writeheader()
for row in rows:
writer.writerow(asdict(row) if hasattr(row, '__dataclass_fields__') else row)
def export(self, output_dir: str) -> Dict[str, str]:
"""
Export all data to CSV files.
Args:
output_dir: Directory to write CSV files
Returns:
Dict mapping data type to file path
"""
output_path = Path(output_dir)
output_path.mkdir(parents=True, exist_ok=True)
files = {}
# Events
if self.events:
events_path = output_path / "events.csv"
self._write_csv(events_path, self.events, list(EventRow.__dataclass_fields__.keys()))
files["events"] = str(events_path)
print(f"✓ Exported {len(self.events)} events to {events_path}")
# Chains
if self.chains:
chains_path = output_path / "chains.csv"
self._write_csv(chains_path, self.chains, list(ChainRow.__dataclass_fields__.keys()))
files["chains"] = str(chains_path)
print(f"✓ Exported {len(self.chains)} chains to {chains_path}")
# HOLD events
if self.hold_events:
hold_path = output_path / "hold_events.csv"
self._write_csv(hold_path, self.hold_events, list(HoldEventRow.__dataclass_fields__.keys()))
files["hold_events"] = str(hold_path)
print(f"✓ Exported {len(self.hold_events)} HOLD events to {hold_path}")
# Causation edges
if self.causation_edges:
edges_path = output_path / "causation_edges.csv"
self._write_csv(edges_path, self.causation_edges, list(CausationEdgeRow.__dataclass_fields__.keys()))
files["causation_edges"] = str(edges_path)
print(f"✓ Exported {len(self.causation_edges)} causation edges to {edges_path}")
# Metrics time series
if self.metrics:
metrics_path = output_path / "metrics_timeseries.csv"
self._write_csv(metrics_path, self.metrics, list(MetricRow.__dataclass_fields__.keys()))
files["metrics"] = str(metrics_path)
print(f"✓ Exported {len(self.metrics)} metric points to {metrics_path}")
# Write a manifest
manifest_path = output_path / "manifest.json"
manifest = {
"exported_at": datetime.now().isoformat(),
"files": files,
"counts": {
"events": len(self.events),
"chains": len(self.chains),
"hold_events": len(self.hold_events),
"causation_edges": len(self.causation_edges),
"metrics": len(self.metrics),
}
}
with open(manifest_path, 'w') as f:
json.dump(manifest, f, indent=2)
print(f"\n📊 Tableau export complete: {output_path}")
print(f" Open Tableau → Connect → Text File → Select CSVs")
return files
def export_hyper(self, output_path: str) -> Optional[str]:
"""
Export to Tableau Hyper format (native, fastest).
Requires: pip install tableauhyperapi
"""
if not HAS_HYPER:
print("⚠️ Hyper API not installed. Run: pip install tableauhyperapi")
return None
hyper_path = Path(output_path)
with HyperProcess(telemetry=Telemetry.DO_NOT_SEND_USAGE_DATA_TO_TABLEAU) as hyper:
with Connection(hyper.endpoint, str(hyper_path), CreateMode.CREATE_AND_REPLACE) as conn:
# Create events table
if self.events:
events_table = TableDefinition(
TableName("events"),
[
("event_id", SqlType.text()),
("timestamp", SqlType.double()),
("timestamp_iso", SqlType.text()),
("component", SqlType.text()),
("event_type", SqlType.text()),
("loss", SqlType.double()),
("accuracy", SqlType.double()),
("tokens", SqlType.int()),
]
)
conn.catalog.create_table(events_table)
with Inserter(conn, events_table) as inserter:
for e in self.events:
inserter.add_row([
e.event_id, e.timestamp, e.timestamp_iso,
e.component, e.event_type,
e.loss, e.accuracy, e.tokens
])
inserter.execute()
print(f"✓ Exported Hyper file: {hyper_path}")
return str(hyper_path)
# =============================================================================
# Convenience Functions
# =============================================================================
def export_for_tableau(output_dir: str = "./tableau_export",
include_sample_data: bool = True) -> Dict[str, str]:
"""
One-line export of all Cascade data for Tableau.
Args:
output_dir: Where to write CSV files
include_sample_data: Generate sample data if no real data
Returns:
Dict of exported file paths
"""
exporter = TableauExporter()
# Try to load real data from Cascade store
try:
from cascade.store import query, stats
from cascade.observation import ObservationManager
# Get observations
manager = ObservationManager()
observations = manager.get_recent(limit=1000)
for obs in observations:
# Create mock event from observation
class MockEvent:
def __init__(self, o):
self.event_id = o.get('cid', '')
self.timestamp = o.get('timestamp', 0)
self.component = o.get('model_id', 'unknown')
self.event_type = 'inference'
self.data = o.get('data', {})
exporter.add_event(MockEvent(obs))
exporter.add_metrics_from_event(MockEvent(obs))
print(f"Loaded {len(observations)} observations from Cascade store")
except Exception as e:
print(f"Note: Could not load Cascade store ({e})")
if include_sample_data:
print("Generating sample data for demo...")
_add_sample_data(exporter)
return exporter.export(output_dir)
def _add_sample_data(exporter: TableauExporter) -> None:
"""Add sample data for demonstration."""
import time
import random
base_time = time.time() - 3600 # 1 hour ago
# Sample events
models = ["gpt-4", "claude-3-opus", "llama-3-8b", "mistral-7b"]
event_types = ["inference", "training_step", "error", "checkpoint"]
for i in range(200):
class SampleEvent:
def __init__(self, idx):
self.event_id = f"evt_{idx:06d}"
self.timestamp = base_time + (idx * 18) # 18 sec apart
self.component = random.choice(models)
self.event_type = random.choice(event_types)
self.data = {
"loss": 2.5 - (idx * 0.01) + random.uniform(-0.1, 0.1),
"accuracy": min(0.95, 0.5 + (idx * 0.002) + random.uniform(-0.02, 0.02)),
"tokens": random.randint(100, 2000),
"latency_ms": random.uniform(50, 500),
"step": idx,
}
event = SampleEvent(i)
exporter.add_event(event)
exporter.add_metrics_from_event(event)
# Sample HOLD events
for i in range(20):
class SampleHoldPoint:
def __init__(self, idx):
import numpy as np
self.hold_id = f"hold_{idx:04d}"
self.timestamp = base_time + (idx * 180)
self.brain_id = random.choice(models)
self.action_probs = np.random.dirichlet([1, 1, 1, 1])
self.value = random.uniform(0.3, 0.9)
class SampleResolution:
def __init__(self, override=False):
self.state = type('State', (), {'value': 'OVERRIDDEN' if override else 'ACCEPTED'})()
self.action = random.randint(0, 3)
self.was_override = override
self.hold_duration = random.uniform(0.5, 10.0)
self.override_source = "human" if override else None
hold = SampleHoldPoint(i)
resolution = SampleResolution(override=random.random() < 0.25)
exporter.add_hold_event(hold, resolution)
# Sample causation edges
for i in range(50):
class SampleLink:
def __init__(self, idx):
self.link_id = f"link_{idx:04d}"
self.from_event = f"evt_{idx:06d}"
self.to_event = f"evt_{idx+1:06d}"
self.causation_type = random.choice(["temporal", "correlation", "threshold", "direct"])
self.strength = random.uniform(0.5, 1.0)
self.timestamp = base_time + (idx * 18)
exporter.add_causation_link(SampleLink(i))
# Sample chains
for i in range(10):
class SampleChain:
def __init__(self, idx):
self.session_id = f"session_{idx:04d}"
self.model_id = random.choice(models)
self.model_hash = f"{random.randint(0, 0xFFFFFFFF):08x}"
self.input_hash = f"{random.randint(0, 0xFFFFFFFF):08x}"
self.output_hash = f"{random.randint(0, 0xFFFFFFFF):08x}"
self.merkle_root = f"{random.randint(0, 0xFFFFFFFFFFFFFFFF):016x}"
self.created_at = base_time + (idx * 360)
self.records = [None] * random.randint(5, 50)
self.external_roots = [f"root_{j}" for j in range(random.randint(0, 3))]
exporter.add_chain(SampleChain(i))
def export_events_csv(events, output_path: str) -> str:
"""Export events to CSV."""
exporter = TableauExporter()
exporter.add_events(events)
files = exporter.export(str(Path(output_path).parent))
return files.get("events", "")
def export_chains_csv(chains, output_path: str) -> str:
"""Export chains to CSV."""
exporter = TableauExporter()
exporter.add_chains(chains)
files = exporter.export(str(Path(output_path).parent))
return files.get("chains", "")
def export_metrics_csv(events, output_path: str) -> str:
"""Export metrics time series to CSV."""
exporter = TableauExporter()
for e in events:
exporter.add_metrics_from_event(e)
files = exporter.export(str(Path(output_path).parent))
return files.get("metrics", "")
def export_hold_events_csv(hold_pairs, output_path: str) -> str:
"""Export HOLD events to CSV. hold_pairs = [(hold_point, resolution), ...]"""
exporter = TableauExporter()
for hold, res in hold_pairs:
exporter.add_hold_event(hold, res)
files = exporter.export(str(Path(output_path).parent))
return files.get("hold_events", "")
def export_causation_graph_csv(links, output_path: str) -> str:
"""Export causation edges to CSV."""
exporter = TableauExporter()
exporter.add_causation_links(links)
files = exporter.export(str(Path(output_path).parent))
return files.get("causation_edges", "")
if __name__ == "__main__":
# Quick test
print("Exporting sample data for Tableau...")
export_for_tableau("./tableau_export", include_sample_data=True)