Spaces:
Running
Running
| from __future__ import annotations | |
| from dataclasses import dataclass, field | |
| from time import time | |
| from typing import Any | |
| class CommsMessage: | |
| sender: str | |
| receiver: str | |
| payload: dict[str, Any] | |
| timestamp: float = field(default_factory=time) | |
| class CommsBus: | |
| """ | |
| Lightweight message log for partial-observability experiments. | |
| The environment keeps hidden specialist metadata internally, while the | |
| orchestrator-facing view only exposes the public response, confidence, and | |
| outcome summary. This makes the trust problem behavioral instead of identity | |
| based. | |
| """ | |
| def __init__(self, partial_observability: bool = True) -> None: | |
| self.partial_observability = partial_observability | |
| self._messages: list[CommsMessage] = [] | |
| def reset(self) -> None: | |
| self._messages.clear() | |
| def route(self, sender: str, receiver: str, payload: dict[str, Any]) -> dict[str, Any]: | |
| visible_payload = self._filter_payload(payload) | |
| self._messages.append( | |
| CommsMessage(sender=sender, receiver=receiver, payload=visible_payload) | |
| ) | |
| return visible_payload | |
| def history(self, receiver: str | None = None) -> list[dict[str, Any]]: | |
| messages = self._messages | |
| if receiver is not None: | |
| messages = [msg for msg in messages if msg.receiver == receiver] | |
| return [ | |
| { | |
| "sender": msg.sender, | |
| "receiver": msg.receiver, | |
| "payload": dict(msg.payload), | |
| "timestamp": round(msg.timestamp, 3), | |
| } | |
| for msg in messages | |
| ] | |
| def _filter_payload(self, payload: dict[str, Any]) -> dict[str, Any]: | |
| if not self.partial_observability: | |
| return dict(payload) | |
| hidden_keys = { | |
| "internal_id", | |
| "internal_profile", | |
| "ground_truth_reliability", | |
| "adversarial_slot", | |
| } | |
| return {key: value for key, value in payload.items() if key not in hidden_keys} | |