Spaces:
Runtime error
Runtime error
| """ | |
| Delegation Graph — Directed Acyclic Graph enforcement for delegation chains. | |
| Prevents: A → B → A (infinite loops) | |
| Prevents: A → B → C → A (indirect cycles) | |
| Enforces: Maximum delegation depth budget | |
| Provides: Action masking for valid next-call candidates | |
| """ | |
| from __future__ import annotations | |
| from dataclasses import dataclass, field | |
| from collections import defaultdict, deque | |
| from typing import Optional | |
| class DelegationEdge: | |
| caller_id: str | |
| callee_id: str | |
| depth: int | |
| delegation_mode: str | |
| step: int | |
| class DelegationGraph: | |
| """ | |
| Enforces delegation as a DAG. No cycles, no depth violations. | |
| Design: Built incrementally during an episode. At each step, | |
| before executing an action, the policy checks `can_delegate(caller, callee)`. | |
| If False, the action is masked to zero probability. | |
| """ | |
| def __init__(self, max_depth: int = 2): | |
| self.max_depth = max_depth | |
| self._edges: list[DelegationEdge] = [] | |
| self._adj: dict[str, set[str]] = defaultdict(set) # caller → callees | |
| self._depth_map: dict[str, int] = {} # node_id → depth from root | |
| self._current_depth: int = 0 | |
| self._step: int = 0 | |
| def reset(self) -> None: | |
| """Reset graph for a new episode.""" | |
| self._edges.clear() | |
| self._adj.clear() | |
| self._depth_map.clear() | |
| self._current_depth = 0 | |
| self._step = 0 | |
| def add_root(self, orchestrator_id: str) -> None: | |
| """Register the orchestrator as the root node at depth 0.""" | |
| self._depth_map[orchestrator_id] = 0 | |
| def can_delegate(self, caller_id: str, callee_id: str) -> bool: | |
| """ | |
| Check if caller CAN delegate to callee. | |
| Returns False if: | |
| - Adding this edge would create a cycle | |
| - callee is already at max_depth | |
| - caller == callee (self-delegation) | |
| """ | |
| if caller_id == callee_id: | |
| return False | |
| caller_depth = self._depth_map.get(caller_id, 0) | |
| proposed_callee_depth = caller_depth + 1 | |
| if proposed_callee_depth > self.max_depth: | |
| return False | |
| if self._would_create_cycle(caller_id, callee_id): | |
| return False | |
| return True | |
| def _would_create_cycle(self, caller_id: str, callee_id: str) -> bool: | |
| """ | |
| Check if adding edge (caller → callee) would create a cycle. | |
| Uses DFS from callee to see if we can reach caller. | |
| """ | |
| if callee_id not in self._adj: | |
| return False # callee has no outgoing edges yet | |
| visited = set() | |
| stack = deque([callee_id]) | |
| while stack: | |
| node = stack.pop() | |
| if node == caller_id: | |
| return True | |
| if node in visited: | |
| continue | |
| visited.add(node) | |
| for neighbor in self._adj.get(node, set()): | |
| stack.append(neighbor) | |
| return False | |
| def record_delegation( | |
| self, | |
| caller_id: str, | |
| callee_id: str, | |
| delegation_mode: str, | |
| ) -> None: | |
| """ | |
| Record a delegation edge after validation. | |
| Call ONLY after `can_delegate()` returned True. | |
| """ | |
| if not self.can_delegate(caller_id, callee_id): | |
| raise ValueError( | |
| f"Invalid delegation: {caller_id} → {callee_id} " | |
| f"(would create cycle or exceed depth)" | |
| ) | |
| caller_depth = self._depth_map.get(caller_id, 0) | |
| callee_depth = caller_depth + 1 | |
| self._adj[caller_id].add(callee_id) | |
| self._depth_map[callee_id] = callee_depth | |
| self._current_depth = max(self._current_depth, callee_depth) | |
| edge = DelegationEdge( | |
| caller_id=caller_id, | |
| callee_id=callee_id, | |
| depth=callee_depth, | |
| delegation_mode=delegation_mode, | |
| step=self._step, | |
| ) | |
| self._edges.append(edge) | |
| self._step += 1 | |
| def get_valid_callees( | |
| self, caller_id: str, all_specialist_ids: list[str] | |
| ) -> list[str]: | |
| """ | |
| Return the list of specialist IDs that caller can still delegate to. | |
| Used for action masking in the policy. | |
| """ | |
| return [ | |
| sid for sid in all_specialist_ids | |
| if self.can_delegate(caller_id, sid) | |
| ] | |
| def get_called_specialists(self) -> list[str]: | |
| """Return all specialists called so far this episode.""" | |
| called = set() | |
| for edge in self._edges: | |
| called.add(edge.callee_id) | |
| return list(called) | |
| def get_delegation_path(self) -> list[DelegationEdge]: | |
| """Return the full delegation path for this episode.""" | |
| return list(self._edges) | |
| def depth(self) -> int: | |
| return self._current_depth | |
| def edge_count(self) -> int: | |
| return len(self._edges) | |
| def to_adjacency_vector( | |
| self, all_ids: list[str], max_size: int = 10 | |
| ) -> list[float]: | |
| """ | |
| Encode the delegation graph as a flat adjacency vector for the policy. | |
| Shape: (max_size * max_size,) — padded with zeros. | |
| This replaces the GNN layer from the original v3 design. | |
| An MLP operating on this vector is sufficient for the hackathon demo. | |
| Production would use a proper GNN. | |
| """ | |
| n = min(len(all_ids), max_size) | |
| id_to_idx = {sid: i for i, sid in enumerate(all_ids[:n])} | |
| matrix = [[0.0] * n for _ in range(n)] | |
| for edge in self._edges: | |
| if edge.caller_id in id_to_idx and edge.callee_id in id_to_idx: | |
| i = id_to_idx[edge.caller_id] | |
| j = id_to_idx[edge.callee_id] | |
| matrix[i][j] = 1.0 | |
| flat = [] | |
| for row in matrix: | |
| flat.extend(row) | |
| target_len = max_size * max_size | |
| flat.extend([0.0] * (target_len - len(flat))) | |
| return flat[:target_len] | |
| def is_auditable(self) -> bool: | |
| """ | |
| Returns True if the delegation path has a clear, explainable structure. | |
| Criteria: all edges recorded, no cycles detected, depth ≤ max_depth. | |
| """ | |
| return ( | |
| len(self._edges) > 0 | |
| and self._current_depth <= self.max_depth | |
| ) | |