gMAS / examples /streaming_example.py
Артём Боярских
chore: initial commit
3193174
"""
Streaming execution β€” no LLM required (uses a local mock).
Demonstrates LangGraph-style streaming with real-time output:
1. Synchronous streaming β€” iterate over events
2. Streaming with a StreamBuffer to collect results
3. Token-level streaming β€” word-by-word output
4. Async streaming
5. Async token-level streaming
6. print_stream() helper β€” handles formatting automatically
7. Adaptive streaming β€” topology may change during execution
Run:
python -m examples.streaming_example
"""
import asyncio
from builder import build_property_graph
from core.agent import AgentProfile
from execution import (
MACPRunner,
RunnerConfig,
StreamBuffer,
StreamEventType,
format_event,
print_stream,
)
# ── Sample graph ──────────────────────────────────────────────────────────────
def _create_graph():
"""Three-agent pipeline: Researcher β†’ Writer β†’ Editor."""
agents = [
AgentProfile(
agent_id="researcher",
display_name="Research Specialist",
persona="You are an AI researcher who explains technical concepts clearly.",
description="Gather and synthesise information about the topic.",
),
AgentProfile(
agent_id="writer",
display_name="Content Writer",
persona="You are a skilled writer who creates engaging content.",
description="Transform research into readable content.",
),
AgentProfile(
agent_id="editor",
display_name="Editor",
persona="You are an editor who ensures clarity and quality.",
description="Review and polish the final content.",
),
]
return build_property_graph(
agents,
workflow_edges=[("researcher", "writer"), ("writer", "editor")],
query="Explain how AI works in simple terms",
include_task_node=True,
)
# ── Mock LLM callables ───────────────────────────────────────────────────────
# ── Mock LLM callables ───────────────────────────────────────────────────────
def _mock_llm(prompt: str) -> str:
"""Return a canned response based on which agent is running."""
p = prompt.lower()
if "researcher" in p or "research" in p:
return (
"AI works by using mathematical models trained on large amounts of data "
"to recognise patterns and make predictions. Key components include "
"neural networks, machine learning algorithms, and training data. "
"Modern AI systems like ChatGPT use transformer architectures."
)
if "writer" in p or "content" in p:
return (
"## How AI Works: A Simple Guide\n\n"
"Imagine teaching a child to recognise cats by showing thousands of pictures. "
"AI works similarly β€” it learns from examples!\n\n"
"**The Basics:**\n"
"- Neural networks inspired by the human brain\n"
"- Learns patterns from massive data\n"
"- Makes predictions on new data"
)
return (
"# How AI Works: A Simple Guide\n\n"
"AI learns from examples, just like humans! By analysing patterns in data, "
"AI systems can recognise images, understand language, and have conversations.\n\n"
"Content reviewed and polished for clarity."
)
def _mock_streaming_llm(prompt: str):
"""Yield words one by one to simulate token-level streaming."""
words = _mock_llm(prompt).split(" ")
for i, word in enumerate(words):
yield word + (" " if i < len(words) - 1 else "")
async def _mock_async_llm(prompt: str) -> str:
await asyncio.sleep(0.05)
return _mock_llm(prompt)
async def _mock_async_streaming_llm(prompt: str):
words = _mock_llm(prompt).split(" ")
for i, word in enumerate(words):
await asyncio.sleep(0.01)
yield word + (" " if i < len(words) - 1 else "")
# ── Examples ──────────────────────────────────────────────────────────────────
def example_sync_streaming():
"""1. Iterate over raw stream events."""
print("\n── 1 Β· Synchronous streaming ──")
runner = MACPRunner(llm_caller=_mock_llm)
for event in runner.stream(_create_graph()):
text = format_event(event)
if text:
print(f" {text}")
def example_buffer():
"""2. Collect events in a StreamBuffer; print only AGENT_OUTPUT."""
print("\n── 2 Β· Streaming with buffer ──")
runner = MACPRunner(llm_caller=_mock_llm)
buf = StreamBuffer()
for event in runner.stream(_create_graph()):
buf.add(event)
if event.event_type == StreamEventType.AGENT_OUTPUT:
name = getattr(event, "agent_name", "?")
content = getattr(event, "content", "")
print(f" [{name}] {content[:80]}…")
print(f" Buffered events: {len(buf.events)}")
def example_token_streaming():
"""3. Token-level (word-by-word) streaming."""
print("\n── 3 Β· Token streaming ──")
config = RunnerConfig(enable_token_streaming=True)
runner = MACPRunner(streaming_llm_caller=_mock_streaming_llm, config=config)
current_agent = None
tokens = 0
for event in runner.stream(_create_graph()):
if event.event_type == StreamEventType.AGENT_START:
if current_agent:
print(f" ({tokens} tokens)")
current_agent = getattr(event, "agent_name", "?")
tokens = 0
print(f" [{current_agent}] ", end="", flush=True)
elif event.event_type == StreamEventType.TOKEN:
print(getattr(event, "token", ""), end="", flush=True)
tokens += 1
elif event.event_type == StreamEventType.RUN_END:
answer = getattr(event, "final_answer", "")
print(f"\n Done. Answer: {answer[:60]}…")
async def example_async_streaming():
"""4. Async streaming."""
print("\n── 4 Β· Async streaming ──")
runner = MACPRunner(async_llm_caller=_mock_async_llm)
async for event in runner.astream(_create_graph()):
if event.event_type == StreamEventType.AGENT_START:
print(f" Starting: {getattr(event, 'agent_name', '?')}")
elif event.event_type == StreamEventType.AGENT_OUTPUT:
print(f" Output : {getattr(event, 'content', '')[:80]}…")
elif event.event_type == StreamEventType.RUN_END:
print(f" Done. Answer: {getattr(event, 'final_answer', '')[:60]}…")
async def example_async_token_streaming():
"""5. Async token-level streaming."""
print("\n── 5 Β· Async token streaming ──")
config = RunnerConfig(enable_token_streaming=True)
runner = MACPRunner(async_streaming_llm_caller=_mock_async_streaming_llm, config=config)
current = None
async for event in runner.astream(_create_graph()):
if event.event_type == StreamEventType.AGENT_START:
if current:
print()
current = getattr(event, "agent_name", "?")
print(f" [{current}] ", end="", flush=True)
elif event.event_type == StreamEventType.TOKEN:
print(getattr(event, "token", ""), end="", flush=True)
elif event.event_type == StreamEventType.RUN_END:
print("\n Done.")
def example_print_stream():
"""6. print_stream() helper β€” handles formatting automatically."""
print("\n── 6 Β· print_stream() helper ──")
runner = MACPRunner(llm_caller=_mock_llm)
answer = print_stream(runner.stream(_create_graph()), show_tokens=False, verbose=True)
print(f" Returned answer length: {len(answer or '')}")
def example_adaptive():
"""7. Adaptive execution β€” topology may change during the run."""
print("\n── 7 Β· Adaptive streaming ──")
runner = MACPRunner(llm_caller=_mock_llm, config=RunnerConfig(adaptive=True))
for event in runner.stream(_create_graph()):
text = format_event(event)
if text:
print(f" {text}")
# ── Entry point ───────────────────────────────────────────────────────────────
# ── Entry point ───────────────────────────────────────────────────────────────
def main():
example_sync_streaming()
example_buffer()
example_token_streaming()
example_print_stream()
example_adaptive()
asyncio.run(example_async_streaming())
asyncio.run(example_async_token_streaming())
print("\nAll streaming examples completed βœ…")
if __name__ == "__main__":
main()