# -*- coding: utf-8 -*- """ pluto/bus.py — Lightweight in-memory message bus for agent communication. Agents write typed messages. Other agents read by role or message type. This is the communication backbone for multi-agent coordination. """ from __future__ import annotations from dataclasses import dataclass, field from typing import Any, Callable import time from fastapi.encoders import jsonable_encoder @dataclass class Message: sender: str # e.g. "planner", "strategist", "worker-C3", "critic" msg_type: str # e.g. "chunk_plan", "audit", "fact_tuple", "challenge", "retraction" payload: dict[str, Any] timestamp: float = field(default_factory=time.perf_counter) class MessageBus: """ Append-only message log shared across all agents in one pipeline run. Each run gets a fresh bus instance — no cross-run state. """ def __init__(self) -> None: self._messages: list[Message] = [] self._listeners: list[Callable[[str, str, dict], None]] = [] def subscribe(self, callback: Callable[[str, str, dict], None]) -> None: """Register a callback(sender, msg_type, payload) for new messages.""" self._listeners.append(callback) def post(self, sender: str, msg_type: str, payload: dict[str, Any]) -> None: """Post a message to the bus.""" safe_payload = jsonable_encoder(payload) msg = Message(sender=sender, msg_type=msg_type, payload=safe_payload) self._messages.append(msg) for cb in self._listeners: try: cb(sender, msg_type, safe_payload) except: pass def read(self, msg_type: str | None = None, sender: str | None = None) -> list[Message]: """Read messages, optionally filtered by type and/or sender.""" msgs = self._messages if msg_type: msgs = [m for m in msgs if m.msg_type == msg_type] if sender: msgs = [m for m in msgs if m.sender == sender] return msgs def latest(self, msg_type: str) -> Message | None: """Return the most recent message of a given type.""" matches = self.read(msg_type=msg_type) return matches[-1] if matches else None def dump(self) -> list[dict]: """Export all messages for tracing/debugging.""" return [ { "sender": m.sender, "type": m.msg_type, "payload": m.payload, "t": round(m.timestamp, 4), } for m in self._messages ]