Spaces:
Running
Running
| from __future__ import annotations | |
| import json | |
| import time | |
| from dataclasses import dataclass | |
| from typing import Any, Dict, Iterable, List, Optional | |
| class Message: | |
| ts: float | |
| sender: str | |
| receiver: str | |
| kind: str | |
| payload: Dict[str, Any] | |
| def to_dict(self) -> Dict[str, Any]: | |
| return { | |
| "ts": self.ts, | |
| "sender": self.sender, | |
| "receiver": self.receiver, | |
| "kind": self.kind, | |
| "payload": self.payload, | |
| } | |
| class MessageBus: | |
| def __init__(self) -> None: | |
| self._messages: List[Message] = [] | |
| def send( | |
| self, | |
| *, | |
| sender: str, | |
| receiver: str, | |
| kind: str, | |
| payload: Optional[Dict[str, Any]] = None, | |
| ) -> None: | |
| self._messages.append( | |
| Message( | |
| ts=time.time(), | |
| sender=sender, | |
| receiver=receiver, | |
| kind=kind, | |
| payload=payload or {}, | |
| ) | |
| ) | |
| def iter(self) -> Iterable[Message]: | |
| return iter(self._messages) | |
| def to_jsonl(self) -> str: | |
| return "\n".join(json.dumps(m.to_dict(), ensure_ascii=False) for m in self._messages) + "\n" | |