Condensate / graph_builder.py
Executor-Tyrant-Framework's picture
Condensate PoC β€” all 4 layers + HF Spaces demo
262b9d5
"""
Condensate Layer 1: The Graph Builder
Takes access logs from the Membrane (Layer 0) and builds a weighted
graph of memory access patterns. Discovers:
- Temporal edges: A accessed near B β†’ weighted edge
- Causal chains: A always before B β†’ directed edge with timing
- Clusters: groups of regions always accessed together (proto-hyperedges)
- Hot/cold classification: access frequency distribution
This is the substrate's raw material. Layer 2 (predictor) will use
this graph to predict future accesses.
Usage:
from membrane import Membrane
from graph_builder import GraphBuilder
# ... run workload with Membrane wrapping ...
log = Membrane.get_log()
graph = GraphBuilder()
graph.build(log)
graph.print_analysis()
graph.save("access_graph.json")
"""
import numpy as np
from collections import defaultdict
import json
class AccessNode:
"""A memory region tracked in the graph."""
__slots__ = ['path', 'access_count', 'read_count', 'write_count',
'total_bytes', 'first_access_ns', 'last_access_ns',
'access_times_ns', '_temp_class']
def __init__(self, path):
self.path = path
self.access_count = 0
self.read_count = 0
self.write_count = 0
self.total_bytes = 0
self.first_access_ns = float('inf')
self.last_access_ns = 0
self.access_times_ns = []
self._temp_class = "WARM" # default
def record(self, ts_ns, event_type, size_bytes):
self.access_count += 1
if event_type == "READ":
self.read_count += 1
else:
self.write_count += 1
self.total_bytes += size_bytes
self.first_access_ns = min(self.first_access_ns, ts_ns)
self.last_access_ns = max(self.last_access_ns, ts_ns)
self.access_times_ns.append(ts_ns)
@property
def temperature(self):
"""Normalized access frequency. Higher = hotter."""
return self.access_count
def to_dict(self):
return {
"path": self.path,
"access_count": self.access_count,
"reads": self.read_count,
"writes": self.write_count,
"total_bytes": self.total_bytes,
}
class CausalEdge:
"""A directed edge: source is accessed BEFORE target."""
__slots__ = ['source', 'target', 'count', 'timing_deltas_ns',
'mean_delta_ns', 'std_delta_ns', 'weight']
def __init__(self, source, target):
self.source = source
self.target = target
self.count = 0
self.timing_deltas_ns = []
self.mean_delta_ns = 0.0
self.std_delta_ns = 0.0
self.weight = 0.0 # computed after all edges built
def add_observation(self, delta_ns):
self.count += 1
self.timing_deltas_ns.append(delta_ns)
def finalize(self):
"""Compute statistics after all observations."""
if self.timing_deltas_ns:
arr = np.array(self.timing_deltas_ns, dtype=np.float64)
self.mean_delta_ns = float(np.mean(arr))
self.std_delta_ns = float(np.std(arr))
# Weight: frequency Γ— timing consistency
# High count + low variance = strong causal edge
consistency = 1.0 / (1.0 + self.std_delta_ns / max(self.mean_delta_ns, 1.0))
self.weight = self.count * consistency
def to_dict(self):
return {
"source": self.source,
"target": self.target,
"count": self.count,
"mean_delta_ms": round(self.mean_delta_ns / 1_000_000, 3),
"std_delta_ms": round(self.std_delta_ns / 1_000_000, 3),
"weight": round(self.weight, 2),
}
class Cluster:
"""A group of paths always accessed together β€” proto-hyperedge."""
def __init__(self, cluster_id, members):
self.cluster_id = cluster_id
self.members = set(members)
self.total_coaccesses = 0
def to_dict(self):
return {
"id": self.cluster_id,
"members": sorted(self.members),
"size": len(self.members),
"total_coaccesses": self.total_coaccesses,
}
class GraphBuilder:
"""Builds a weighted access pattern graph from Membrane logs.
The graph has:
- Nodes: memory regions (paths) with access statistics
- Causal edges: directed, weighted, with timing information
- Clusters: groups of paths that always co-access (proto-hyperedges)
"""
def __init__(self, causal_window_ns=5_000_000, cluster_threshold=0.7):
"""
Args:
causal_window_ns: Max time gap (ns) to consider causal.
Default 5ms β€” wide enough for Python overhead.
cluster_threshold: Co-access ratio to form a cluster.
0.7 = paths must co-access 70%+ of the time.
"""
self.causal_window_ns = causal_window_ns
self.cluster_threshold = cluster_threshold
self.nodes = {} # path β†’ AccessNode
self.edges = {} # (source, target) β†’ CausalEdge
self.clusters = [] # list of Cluster
self._built = False
def build(self, log_entries):
"""Build the graph from Membrane log entries.
Args:
log_entries: list of (timestamp_ns, event_type, path, size_bytes)
"""
if not log_entries:
print(" Warning: empty log, nothing to build")
return
# Phase 1: Build nodes
for ts, event_type, path, size_bytes in log_entries:
if path not in self.nodes:
self.nodes[path] = AccessNode(path)
self.nodes[path].record(ts, event_type, size_bytes)
# Phase 2: Build causal edges
# Sort by timestamp for sequential scanning
sorted_log = sorted(log_entries, key=lambda e: e[0])
for i, (ts_i, _, path_i, _) in enumerate(sorted_log):
# Look forward within the causal window
for j in range(i + 1, len(sorted_log)):
ts_j, _, path_j, _ = sorted_log[j]
delta = ts_j - ts_i
if delta > self.causal_window_ns:
break # past the window
if path_i == path_j:
continue # self-loop, skip
# Directed edge: i happened before j
key = (path_i, path_j)
if key not in self.edges:
self.edges[key] = CausalEdge(path_i, path_j)
self.edges[key].add_observation(delta)
# Finalize edge statistics
for edge in self.edges.values():
edge.finalize()
# Phase 3: Discover clusters (proto-hyperedges)
self._discover_clusters()
# Phase 4: Classify temperature
self._classify_temperature()
self._built = True
def _discover_clusters(self):
"""Find groups of paths that are consistently co-accessed.
Uses a simple greedy approach:
1. For each pair of paths, compute co-access ratio
2. Build adjacency from pairs above threshold
3. Connected components = clusters
"""
if len(self.nodes) < 2:
return
paths = list(self.nodes.keys())
n = len(paths)
# Build co-access matrix
# co_access[i][j] = times i and j were accessed within window / min(count_i, count_j)
path_to_idx = {p: i for i, p in enumerate(paths)}
cocount = np.zeros((n, n), dtype=np.int32)
for (src, tgt), edge in self.edges.items():
i, j = path_to_idx.get(src), path_to_idx.get(tgt)
if i is not None and j is not None:
cocount[i][j] += edge.count
cocount[j][i] += edge.count
# Normalize to co-access ratio
counts = np.array([self.nodes[p].access_count for p in paths], dtype=np.float64)
min_counts = np.minimum.outer(counts, counts)
min_counts = np.maximum(min_counts, 1.0) # avoid div by zero
coratio = cocount / min_counts
# Build adjacency and find connected components
adjacency = defaultdict(set)
for i in range(n):
for j in range(i + 1, n):
if coratio[i][j] >= self.cluster_threshold:
adjacency[i].add(j)
adjacency[j].add(i)
# BFS to find connected components
visited = set()
cluster_id = 0
for start in range(n):
if start in visited:
continue
if start not in adjacency:
continue
# BFS
component = set()
queue = [start]
while queue:
node = queue.pop(0)
if node in visited:
continue
visited.add(node)
component.add(node)
for neighbor in adjacency.get(node, []):
if neighbor not in visited:
queue.append(neighbor)
if len(component) >= 2:
members = [paths[i] for i in component]
cluster = Cluster(cluster_id, members)
# Sum co-access counts within cluster
for i in component:
for j in component:
if i != j:
cluster.total_coaccesses += cocount[i][j]
self.clusters.append(cluster)
cluster_id += 1
def _classify_temperature(self):
"""Tag nodes as hot/warm/cold based on access distribution."""
if not self.nodes:
return
counts = [n.access_count for n in self.nodes.values()]
if not counts:
return
# Use percentiles for classification
p75 = np.percentile(counts, 75)
p25 = np.percentile(counts, 25)
for node in self.nodes.values():
if node.access_count >= p75:
node._temp_class = "HOT"
elif node.access_count >= p25:
node._temp_class = "WARM"
else:
node._temp_class = "COLD"
def get_causal_chains(self, min_weight=2.0, max_depth=10):
"""Extract causal chains — sequences of A→B→C with strong edges.
Returns list of chains, each chain is [(path, mean_delta_ms), ...]
"""
if not self._built:
return []
# Build adjacency list of strong edges, sorted by weight
successors = defaultdict(list)
for (src, tgt), edge in self.edges.items():
if edge.weight >= min_weight:
successors[src].append((tgt, edge))
# Sort successors by weight descending
for src in successors:
successors[src].sort(key=lambda x: -x[1].weight)
# Find chains starting from each node
chains = []
visited_starts = set()
# Start from nodes that have strong outgoing but weak incoming
incoming_weight = defaultdict(float)
outgoing_weight = defaultdict(float)
for (src, tgt), edge in self.edges.items():
if edge.weight >= min_weight:
outgoing_weight[src] += edge.weight
incoming_weight[tgt] += edge.weight
# Good chain starts: strong outgoing, weaker incoming
candidates = []
for path in successors:
out_w = outgoing_weight.get(path, 0)
in_w = incoming_weight.get(path, 0)
if out_w > 0:
candidates.append((path, out_w - in_w))
candidates.sort(key=lambda x: -x[1])
for start, _ in candidates:
if start in visited_starts:
continue
# Follow the strongest chain
chain = [(start, 0.0)]
current = start
seen = {start}
for _ in range(max_depth):
if current not in successors:
break
# Take the strongest unvisited successor
found = False
for next_path, edge in successors[current]:
if next_path not in seen:
chain.append((next_path, edge.mean_delta_ns / 1_000_000))
seen.add(next_path)
current = next_path
found = True
break
if not found:
break
if len(chain) >= 2:
chains.append(chain)
visited_starts.update(p for p, _ in chain)
return chains
def print_analysis(self):
"""Print a comprehensive analysis of the access graph."""
if not self._built:
print(" Graph not built yet. Call build() first.")
return
print(f"\n{'='*60}")
print(f" CONDENSATE β€” Layer 1 Graph Analysis")
print(f"{'='*60}")
# Node summary
hot = [n for n in self.nodes.values() if getattr(n, '_temp_class', '') == 'HOT']
warm = [n for n in self.nodes.values() if getattr(n, '_temp_class', '') == 'WARM']
cold = [n for n in self.nodes.values() if getattr(n, '_temp_class', '') == 'COLD']
print(f"\n Nodes: {len(self.nodes)} total")
print(f" HOT: {len(hot)} (top 25% access frequency)")
print(f" WARM: {len(warm)} (middle 50%)")
print(f" COLD: {len(cold)} (bottom 25%)")
if hot:
print(f"\n Hottest nodes:")
for node in sorted(hot, key=lambda n: -n.access_count)[:10]:
print(f" {node.path:<42} {node.access_count:>5} accesses")
if cold:
print(f"\n Coldest nodes:")
for node in sorted(cold, key=lambda n: n.access_count)[:5]:
print(f" {node.path:<42} {node.access_count:>5} accesses")
# Edge summary
strong_edges = [(k, e) for k, e in self.edges.items() if e.weight >= 2.0]
print(f"\n Edges: {len(self.edges)} total, {len(strong_edges)} strong (weight >= 2.0)")
if strong_edges:
print(f"\n Strongest causal edges (A β†’ B):")
print(f" {'Source':<25} {'β†’ Target':<25} {'Count':>5} {'Ξ”t(ms)':>7} {'Wt':>6}")
print(f" {'-'*25} {'-'*25} {'-'*5} {'-'*7} {'-'*6}")
sorted_edges = sorted(strong_edges, key=lambda x: -x[1].weight)
for (src, tgt), edge in sorted_edges[:15]:
src_short = src if len(src) <= 25 else "..." + src[-22:]
tgt_short = tgt if len(tgt) <= 25 else "..." + tgt[-22:]
print(f" {src_short:<25} {tgt_short:<25} "
f"{edge.count:>5} {edge.mean_delta_ns/1e6:>7.3f} {edge.weight:>6.1f}")
# Cluster summary
if self.clusters:
print(f"\n Clusters (proto-hyperedges): {len(self.clusters)}")
for cluster in sorted(self.clusters, key=lambda c: -len(c.members)):
print(f"\n Cluster {cluster.cluster_id} "
f"({len(cluster.members)} members, "
f"{cluster.total_coaccesses} co-accesses):")
for member in sorted(cluster.members):
node = self.nodes.get(member)
temp = getattr(node, '_temp_class', '?') if node else '?'
count = node.access_count if node else 0
print(f" [{temp:>4}] {member:<40} {count:>4}x")
else:
print(f"\n Clusters: none found (threshold: {self.cluster_threshold})")
# Causal chains
chains = self.get_causal_chains()
if chains:
print(f"\n Causal chains discovered: {len(chains)}")
for i, chain in enumerate(chains[:5]):
parts = []
for path, delta_ms in chain:
short = path.split(".")[-1] if "." in path else path
if delta_ms > 0:
parts.append(f"--({delta_ms:.2f}ms)--> {short}")
else:
parts.append(short)
print(f" Chain {i}: {' '.join(parts)}")
if len(chains) > 5:
print(f" ... and {len(chains) - 5} more chains")
# Condensation potential
if hot and cold:
hot_accesses = sum(n.access_count for n in hot)
total_accesses = sum(n.access_count for n in self.nodes.values())
hot_pct = hot_accesses / total_accesses * 100
print(f"\n Condensation potential:")
print(f" {len(hot)} hot nodes handle {hot_pct:.0f}% of all accesses")
print(f" {len(cold)} cold nodes could be compressed/paged")
if self.clusters:
print(f" {len(self.clusters)} clusters enable batch promote/demote")
if chains:
print(f" {len(chains)} causal chains enable predictive prefetch")
print(f"\n{'='*60}\n")
def save(self, filepath):
"""Save the graph to JSON for later analysis."""
data = {
"nodes": {p: n.to_dict() for p, n in self.nodes.items()},
"edges": [e.to_dict() for e in self.edges.values() if e.weight >= 1.0],
"clusters": [c.to_dict() for c in self.clusters],
"chains": self.get_causal_chains(),
"summary": {
"total_nodes": len(self.nodes),
"total_edges": len(self.edges),
"strong_edges": sum(1 for e in self.edges.values() if e.weight >= 2.0),
"clusters": len(self.clusters),
"chains": len(self.get_causal_chains()),
}
}
class NumpyEncoder(json.JSONEncoder):
def default(self, obj):
if isinstance(obj, (np.integer,)):
return int(obj)
if isinstance(obj, (np.floating,)):
return float(obj)
return super().default(obj)
with open(filepath, 'w') as f:
json.dump(data, f, indent=2, cls=NumpyEncoder)
print(f" Saved graph ({len(self.nodes)} nodes, "
f"{len(self.edges)} edges) to {filepath}")