File size: 9,277 Bytes
3193174
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
"""
Streaming execution β€” no LLM required (uses a local mock).

Demonstrates LangGraph-style streaming with real-time output:
  1. Synchronous streaming β€” iterate over events
  2. Streaming with a StreamBuffer to collect results
  3. Token-level streaming β€” word-by-word output
  4. Async streaming
  5. Async token-level streaming
  6. print_stream() helper β€” handles formatting automatically
  7. Adaptive streaming β€” topology may change during execution

Run:
    python -m examples.streaming_example
"""

import asyncio

from builder import build_property_graph
from core.agent import AgentProfile
from execution import (
    MACPRunner,
    RunnerConfig,
    StreamBuffer,
    StreamEventType,
    format_event,
    print_stream,
)

# ── Sample graph ──────────────────────────────────────────────────────────────


def _create_graph():
    """Three-agent pipeline: Researcher β†’ Writer β†’ Editor."""
    agents = [
        AgentProfile(
            agent_id="researcher",
            display_name="Research Specialist",
            persona="You are an AI researcher who explains technical concepts clearly.",
            description="Gather and synthesise information about the topic.",
        ),
        AgentProfile(
            agent_id="writer",
            display_name="Content Writer",
            persona="You are a skilled writer who creates engaging content.",
            description="Transform research into readable content.",
        ),
        AgentProfile(
            agent_id="editor",
            display_name="Editor",
            persona="You are an editor who ensures clarity and quality.",
            description="Review and polish the final content.",
        ),
    ]
    return build_property_graph(
        agents,
        workflow_edges=[("researcher", "writer"), ("writer", "editor")],
        query="Explain how AI works in simple terms",
        include_task_node=True,
    )


# ── Mock LLM callables ───────────────────────────────────────────────────────


# ── Mock LLM callables ───────────────────────────────────────────────────────


def _mock_llm(prompt: str) -> str:
    """Return a canned response based on which agent is running."""
    p = prompt.lower()
    if "researcher" in p or "research" in p:
        return (
            "AI works by using mathematical models trained on large amounts of data "
            "to recognise patterns and make predictions. Key components include "
            "neural networks, machine learning algorithms, and training data. "
            "Modern AI systems like ChatGPT use transformer architectures."
        )
    if "writer" in p or "content" in p:
        return (
            "## How AI Works: A Simple Guide\n\n"
            "Imagine teaching a child to recognise cats by showing thousands of pictures. "
            "AI works similarly β€” it learns from examples!\n\n"
            "**The Basics:**\n"
            "- Neural networks inspired by the human brain\n"
            "- Learns patterns from massive data\n"
            "- Makes predictions on new data"
        )
    return (
        "# How AI Works: A Simple Guide\n\n"
        "AI learns from examples, just like humans! By analysing patterns in data, "
        "AI systems can recognise images, understand language, and have conversations.\n\n"
        "Content reviewed and polished for clarity."
    )


def _mock_streaming_llm(prompt: str):
    """Yield words one by one to simulate token-level streaming."""
    words = _mock_llm(prompt).split(" ")
    for i, word in enumerate(words):
        yield word + (" " if i < len(words) - 1 else "")


async def _mock_async_llm(prompt: str) -> str:
    await asyncio.sleep(0.05)
    return _mock_llm(prompt)


async def _mock_async_streaming_llm(prompt: str):
    words = _mock_llm(prompt).split(" ")
    for i, word in enumerate(words):
        await asyncio.sleep(0.01)
        yield word + (" " if i < len(words) - 1 else "")


# ── Examples ──────────────────────────────────────────────────────────────────


def example_sync_streaming():
    """1. Iterate over raw stream events."""
    print("\n── 1 Β· Synchronous streaming ──")
    runner = MACPRunner(llm_caller=_mock_llm)
    for event in runner.stream(_create_graph()):
        text = format_event(event)
        if text:
            print(f"  {text}")


def example_buffer():
    """2. Collect events in a StreamBuffer; print only AGENT_OUTPUT."""
    print("\n── 2 Β· Streaming with buffer ──")
    runner = MACPRunner(llm_caller=_mock_llm)
    buf = StreamBuffer()
    for event in runner.stream(_create_graph()):
        buf.add(event)
        if event.event_type == StreamEventType.AGENT_OUTPUT:
            name = getattr(event, "agent_name", "?")
            content = getattr(event, "content", "")
            print(f"  [{name}] {content[:80]}…")
    print(f"  Buffered events: {len(buf.events)}")


def example_token_streaming():
    """3. Token-level (word-by-word) streaming."""
    print("\n── 3 Β· Token streaming ──")
    config = RunnerConfig(enable_token_streaming=True)
    runner = MACPRunner(streaming_llm_caller=_mock_streaming_llm, config=config)

    current_agent = None
    tokens = 0
    for event in runner.stream(_create_graph()):
        if event.event_type == StreamEventType.AGENT_START:
            if current_agent:
                print(f"  ({tokens} tokens)")
            current_agent = getattr(event, "agent_name", "?")
            tokens = 0
            print(f"  [{current_agent}] ", end="", flush=True)
        elif event.event_type == StreamEventType.TOKEN:
            print(getattr(event, "token", ""), end="", flush=True)
            tokens += 1
        elif event.event_type == StreamEventType.RUN_END:
            answer = getattr(event, "final_answer", "")
            print(f"\n  Done. Answer: {answer[:60]}…")


async def example_async_streaming():
    """4. Async streaming."""
    print("\n── 4 Β· Async streaming ──")
    runner = MACPRunner(async_llm_caller=_mock_async_llm)
    async for event in runner.astream(_create_graph()):
        if event.event_type == StreamEventType.AGENT_START:
            print(f"  Starting: {getattr(event, 'agent_name', '?')}")
        elif event.event_type == StreamEventType.AGENT_OUTPUT:
            print(f"  Output  : {getattr(event, 'content', '')[:80]}…")
        elif event.event_type == StreamEventType.RUN_END:
            print(f"  Done. Answer: {getattr(event, 'final_answer', '')[:60]}…")


async def example_async_token_streaming():
    """5. Async token-level streaming."""
    print("\n── 5 Β· Async token streaming ──")
    config = RunnerConfig(enable_token_streaming=True)
    runner = MACPRunner(async_streaming_llm_caller=_mock_async_streaming_llm, config=config)

    current = None
    async for event in runner.astream(_create_graph()):
        if event.event_type == StreamEventType.AGENT_START:
            if current:
                print()
            current = getattr(event, "agent_name", "?")
            print(f"  [{current}] ", end="", flush=True)
        elif event.event_type == StreamEventType.TOKEN:
            print(getattr(event, "token", ""), end="", flush=True)
        elif event.event_type == StreamEventType.RUN_END:
            print("\n  Done.")


def example_print_stream():
    """6. print_stream() helper β€” handles formatting automatically."""
    print("\n── 6 Β· print_stream() helper ──")
    runner = MACPRunner(llm_caller=_mock_llm)
    answer = print_stream(runner.stream(_create_graph()), show_tokens=False, verbose=True)
    print(f"  Returned answer length: {len(answer or '')}")


def example_adaptive():
    """7. Adaptive execution β€” topology may change during the run."""
    print("\n── 7 Β· Adaptive streaming ──")
    runner = MACPRunner(llm_caller=_mock_llm, config=RunnerConfig(adaptive=True))
    for event in runner.stream(_create_graph()):
        text = format_event(event)
        if text:
            print(f"  {text}")


# ── Entry point ───────────────────────────────────────────────────────────────


# ── Entry point ───────────────────────────────────────────────────────────────


def main():
    example_sync_streaming()
    example_buffer()
    example_token_streaming()
    example_print_stream()
    example_adaptive()

    asyncio.run(example_async_streaming())
    asyncio.run(example_async_token_streaming())

    print("\nAll streaming examples completed βœ…")


if __name__ == "__main__":
    main()