File size: 2,064 Bytes
136ea72
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
from __future__ import annotations

from dataclasses import dataclass, field
from time import time
from typing import Any


@dataclass(frozen=True)
class CommsMessage:
    sender: str
    receiver: str
    payload: dict[str, Any]
    timestamp: float = field(default_factory=time)


class CommsBus:
    """
    Lightweight message log for partial-observability experiments.

    The environment keeps hidden specialist metadata internally, while the
    orchestrator-facing view only exposes the public response, confidence, and
    outcome summary. This makes the trust problem behavioral instead of identity
    based.
    """

    def __init__(self, partial_observability: bool = True) -> None:
        self.partial_observability = partial_observability
        self._messages: list[CommsMessage] = []

    def reset(self) -> None:
        self._messages.clear()

    def route(self, sender: str, receiver: str, payload: dict[str, Any]) -> dict[str, Any]:
        visible_payload = self._filter_payload(payload)
        self._messages.append(
            CommsMessage(sender=sender, receiver=receiver, payload=visible_payload)
        )
        return visible_payload

    def history(self, receiver: str | None = None) -> list[dict[str, Any]]:
        messages = self._messages
        if receiver is not None:
            messages = [msg for msg in messages if msg.receiver == receiver]
        return [
            {
                "sender": msg.sender,
                "receiver": msg.receiver,
                "payload": dict(msg.payload),
                "timestamp": round(msg.timestamp, 3),
            }
            for msg in messages
        ]

    def _filter_payload(self, payload: dict[str, Any]) -> dict[str, Any]:
        if not self.partial_observability:
            return dict(payload)

        hidden_keys = {
            "internal_id",
            "internal_profile",
            "ground_truth_reliability",
            "adversarial_slot",
        }
        return {key: value for key, value in payload.items() if key not in hidden_keys}