File size: 2,458 Bytes
7e9a520
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
"""Real-time thought streaming for AtlasOps agents.

Agents emit narrated thoughts as they work. The dashboard subscribes
via SSE (/stream endpoint) and displays them live alongside Grafana.
"""

import asyncio
import json
import time
from collections import deque
from typing import AsyncGenerator


# In-memory ring buffer — last 200 thought events
_thought_buffer: deque = deque(maxlen=200)
_subscribers: list[asyncio.Queue] = []


class ThoughtEvent:
    def __init__(self, role: str, phase: str, thought: str, tool: str = "",
                 result_summary: str = ""):
        self.ts        = time.time()
        self.role      = role       # triage / diagnosis / remediation / comms
        self.phase     = phase      # thinking / tool_call / tool_result / waiting_approval / conclusion
        self.thought   = thought    # human-readable narration
        self.tool      = tool       # tool name if phase=tool_call
        self.result_summary = result_summary

    def to_dict(self) -> dict:
        return {
            "ts":             round(self.ts, 3),
            "role":           self.role,
            "phase":          self.phase,
            "thought":        self.thought,
            "tool":           self.tool,
            "result_summary": self.result_summary,
        }

    def to_sse(self) -> str:
        return f"data: {json.dumps(self.to_dict())}\n\n"


def emit(role: str, phase: str, thought: str, tool: str = "",
         result_summary: str = "") -> None:
    """Emit a thought event to all live subscribers + buffer."""
    event = ThoughtEvent(role, phase, thought, tool, result_summary)
    _thought_buffer.append(event)
    for q in list(_subscribers):
        try:
            q.put_nowait(event)
        except asyncio.QueueFull:
            pass


def get_history() -> list[dict]:
    return [e.to_dict() for e in _thought_buffer]


async def subscribe() -> AsyncGenerator[str, None]:
    """SSE generator — yields thought events as they arrive."""
    q: asyncio.Queue = asyncio.Queue(maxsize=100)
    _subscribers.append(q)
    # Send buffered history first so new subscribers catch up
    for event in list(_thought_buffer):
        yield event.to_sse()
    try:
        while True:
            event = await asyncio.wait_for(q.get(), timeout=30.0)
            yield event.to_sse()
    except asyncio.TimeoutError:
        yield "data: {\"heartbeat\": true}\n\n"
    finally:
        _subscribers.remove(q)