Spaces:
Running
Running
File size: 1,233 Bytes
11ac7be | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 | from __future__ import annotations
import json
import time
from dataclasses import dataclass
from typing import Any, Dict, Iterable, List, Optional
@dataclass(frozen=True)
class Message:
ts: float
sender: str
receiver: str
kind: str
payload: Dict[str, Any]
def to_dict(self) -> Dict[str, Any]:
return {
"ts": self.ts,
"sender": self.sender,
"receiver": self.receiver,
"kind": self.kind,
"payload": self.payload,
}
class MessageBus:
def __init__(self) -> None:
self._messages: List[Message] = []
def send(
self,
*,
sender: str,
receiver: str,
kind: str,
payload: Optional[Dict[str, Any]] = None,
) -> None:
self._messages.append(
Message(
ts=time.time(),
sender=sender,
receiver=receiver,
kind=kind,
payload=payload or {},
)
)
def iter(self) -> Iterable[Message]:
return iter(self._messages)
def to_jsonl(self) -> str:
return "\n".join(json.dumps(m.to_dict(), ensure_ascii=False) for m in self._messages) + "\n"
|