Spaces:
Paused
Paused
File size: 6,293 Bytes
02ff91f | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 | """
Delegation Graph β Directed Acyclic Graph enforcement for delegation chains.
Prevents: A β B β A (infinite loops)
Prevents: A β B β C β A (indirect cycles)
Enforces: Maximum delegation depth budget
Provides: Action masking for valid next-call candidates
"""
from __future__ import annotations
from dataclasses import dataclass, field
from collections import defaultdict, deque
from typing import Optional
@dataclass
class DelegationEdge:
caller_id: str
callee_id: str
depth: int
delegation_mode: str
step: int
class DelegationGraph:
"""
Enforces delegation as a DAG. No cycles, no depth violations.
Design: Built incrementally during an episode. At each step,
before executing an action, the policy checks `can_delegate(caller, callee)`.
If False, the action is masked to zero probability.
"""
def __init__(self, max_depth: int = 2):
self.max_depth = max_depth
self._edges: list[DelegationEdge] = []
self._adj: dict[str, set[str]] = defaultdict(set) # caller β callees
self._depth_map: dict[str, int] = {} # node_id β depth from root
self._current_depth: int = 0
self._step: int = 0
def reset(self) -> None:
"""Reset graph for a new episode."""
self._edges.clear()
self._adj.clear()
self._depth_map.clear()
self._current_depth = 0
self._step = 0
def add_root(self, orchestrator_id: str) -> None:
"""Register the orchestrator as the root node at depth 0."""
self._depth_map[orchestrator_id] = 0
def can_delegate(self, caller_id: str, callee_id: str) -> bool:
"""
Check if caller CAN delegate to callee.
Returns False if:
- Adding this edge would create a cycle
- callee is already at max_depth
- caller == callee (self-delegation)
"""
if caller_id == callee_id:
return False
caller_depth = self._depth_map.get(caller_id, 0)
proposed_callee_depth = caller_depth + 1
if proposed_callee_depth > self.max_depth:
return False
if self._would_create_cycle(caller_id, callee_id):
return False
return True
def _would_create_cycle(self, caller_id: str, callee_id: str) -> bool:
"""
Check if adding edge (caller β callee) would create a cycle.
Uses DFS from callee to see if we can reach caller.
"""
if callee_id not in self._adj:
return False # callee has no outgoing edges yet
visited = set()
stack = deque([callee_id])
while stack:
node = stack.pop()
if node == caller_id:
return True
if node in visited:
continue
visited.add(node)
for neighbor in self._adj.get(node, set()):
stack.append(neighbor)
return False
def record_delegation(
self,
caller_id: str,
callee_id: str,
delegation_mode: str,
) -> None:
"""
Record a delegation edge after validation.
Call ONLY after `can_delegate()` returned True.
"""
if not self.can_delegate(caller_id, callee_id):
raise ValueError(
f"Invalid delegation: {caller_id} β {callee_id} "
f"(would create cycle or exceed depth)"
)
caller_depth = self._depth_map.get(caller_id, 0)
callee_depth = caller_depth + 1
self._adj[caller_id].add(callee_id)
self._depth_map[callee_id] = callee_depth
self._current_depth = max(self._current_depth, callee_depth)
edge = DelegationEdge(
caller_id=caller_id,
callee_id=callee_id,
depth=callee_depth,
delegation_mode=delegation_mode,
step=self._step,
)
self._edges.append(edge)
self._step += 1
def get_valid_callees(
self, caller_id: str, all_specialist_ids: list[str]
) -> list[str]:
"""
Return the list of specialist IDs that caller can still delegate to.
Used for action masking in the policy.
"""
return [
sid for sid in all_specialist_ids
if self.can_delegate(caller_id, sid)
]
def get_called_specialists(self) -> list[str]:
"""Return all specialists called so far this episode."""
called = set()
for edge in self._edges:
called.add(edge.callee_id)
return list(called)
def get_delegation_path(self) -> list[DelegationEdge]:
"""Return the full delegation path for this episode."""
return list(self._edges)
@property
def depth(self) -> int:
return self._current_depth
@property
def edge_count(self) -> int:
return len(self._edges)
def to_adjacency_vector(
self, all_ids: list[str], max_size: int = 10
) -> list[float]:
"""
Encode the delegation graph as a flat adjacency vector for the policy.
Shape: (max_size * max_size,) β padded with zeros.
This replaces the GNN layer from the original v3 design.
An MLP operating on this vector is sufficient for the hackathon demo.
Production would use a proper GNN.
"""
n = min(len(all_ids), max_size)
id_to_idx = {sid: i for i, sid in enumerate(all_ids[:n])}
matrix = [[0.0] * n for _ in range(n)]
for edge in self._edges:
if edge.caller_id in id_to_idx and edge.callee_id in id_to_idx:
i = id_to_idx[edge.caller_id]
j = id_to_idx[edge.callee_id]
matrix[i][j] = 1.0
flat = []
for row in matrix:
flat.extend(row)
target_len = max_size * max_size
flat.extend([0.0] * (target_len - len(flat)))
return flat[:target_len]
def is_auditable(self) -> bool:
"""
Returns True if the delegation path has a clear, explainable structure.
Criteria: all edges recorded, no cycles detected, depth β€ max_depth.
"""
return (
len(self._edges) > 0
and self._current_depth <= self.max_depth
)
|