| """Real-time thought streaming for AtlasOps agents. |
| |
| Agents emit narrated thoughts as they work. The dashboard subscribes |
| via SSE (/stream endpoint) and displays them live alongside Grafana. |
| """ |
|
|
| import asyncio |
| import json |
| import time |
| from collections import deque |
| from typing import AsyncGenerator |
|
|
|
|
| |
| _thought_buffer: deque = deque(maxlen=200) |
| _subscribers: list[asyncio.Queue] = [] |
|
|
|
|
| class ThoughtEvent: |
| def __init__(self, role: str, phase: str, thought: str, tool: str = "", |
| result_summary: str = ""): |
| self.ts = time.time() |
| self.role = role |
| self.phase = phase |
| self.thought = thought |
| self.tool = tool |
| self.result_summary = result_summary |
|
|
| def to_dict(self) -> dict: |
| return { |
| "ts": round(self.ts, 3), |
| "role": self.role, |
| "phase": self.phase, |
| "thought": self.thought, |
| "tool": self.tool, |
| "result_summary": self.result_summary, |
| } |
|
|
| def to_sse(self) -> str: |
| return f"data: {json.dumps(self.to_dict())}\n\n" |
|
|
|
|
| def emit(role: str, phase: str, thought: str, tool: str = "", |
| result_summary: str = "") -> None: |
| """Emit a thought event to all live subscribers + buffer.""" |
| event = ThoughtEvent(role, phase, thought, tool, result_summary) |
| _thought_buffer.append(event) |
| for q in list(_subscribers): |
| try: |
| q.put_nowait(event) |
| except asyncio.QueueFull: |
| pass |
|
|
|
|
| def get_history() -> list[dict]: |
| return [e.to_dict() for e in _thought_buffer] |
|
|
|
|
| async def subscribe() -> AsyncGenerator[str, None]: |
| """SSE generator β yields thought events as they arrive.""" |
| q: asyncio.Queue = asyncio.Queue(maxsize=100) |
| _subscribers.append(q) |
| |
| for event in list(_thought_buffer): |
| yield event.to_sse() |
| try: |
| while True: |
| event = await asyncio.wait_for(q.get(), timeout=30.0) |
| yield event.to_sse() |
| except asyncio.TimeoutError: |
| yield "data: {\"heartbeat\": true}\n\n" |
| finally: |
| _subscribers.remove(q) |
|
|