Spaces:
Running
Running
File size: 9,277 Bytes
3193174 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 | """
Streaming execution β no LLM required (uses a local mock).
Demonstrates LangGraph-style streaming with real-time output:
1. Synchronous streaming β iterate over events
2. Streaming with a StreamBuffer to collect results
3. Token-level streaming β word-by-word output
4. Async streaming
5. Async token-level streaming
6. print_stream() helper β handles formatting automatically
7. Adaptive streaming β topology may change during execution
Run:
python -m examples.streaming_example
"""
import asyncio
from builder import build_property_graph
from core.agent import AgentProfile
from execution import (
MACPRunner,
RunnerConfig,
StreamBuffer,
StreamEventType,
format_event,
print_stream,
)
# ββ Sample graph ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
def _create_graph():
"""Three-agent pipeline: Researcher β Writer β Editor."""
agents = [
AgentProfile(
agent_id="researcher",
display_name="Research Specialist",
persona="You are an AI researcher who explains technical concepts clearly.",
description="Gather and synthesise information about the topic.",
),
AgentProfile(
agent_id="writer",
display_name="Content Writer",
persona="You are a skilled writer who creates engaging content.",
description="Transform research into readable content.",
),
AgentProfile(
agent_id="editor",
display_name="Editor",
persona="You are an editor who ensures clarity and quality.",
description="Review and polish the final content.",
),
]
return build_property_graph(
agents,
workflow_edges=[("researcher", "writer"), ("writer", "editor")],
query="Explain how AI works in simple terms",
include_task_node=True,
)
# ββ Mock LLM callables βββββββββββββββββββββββββββββββββββββββββββββββββββββββ
# ββ Mock LLM callables βββββββββββββββββββββββββββββββββββββββββββββββββββββββ
def _mock_llm(prompt: str) -> str:
"""Return a canned response based on which agent is running."""
p = prompt.lower()
if "researcher" in p or "research" in p:
return (
"AI works by using mathematical models trained on large amounts of data "
"to recognise patterns and make predictions. Key components include "
"neural networks, machine learning algorithms, and training data. "
"Modern AI systems like ChatGPT use transformer architectures."
)
if "writer" in p or "content" in p:
return (
"## How AI Works: A Simple Guide\n\n"
"Imagine teaching a child to recognise cats by showing thousands of pictures. "
"AI works similarly β it learns from examples!\n\n"
"**The Basics:**\n"
"- Neural networks inspired by the human brain\n"
"- Learns patterns from massive data\n"
"- Makes predictions on new data"
)
return (
"# How AI Works: A Simple Guide\n\n"
"AI learns from examples, just like humans! By analysing patterns in data, "
"AI systems can recognise images, understand language, and have conversations.\n\n"
"Content reviewed and polished for clarity."
)
def _mock_streaming_llm(prompt: str):
"""Yield words one by one to simulate token-level streaming."""
words = _mock_llm(prompt).split(" ")
for i, word in enumerate(words):
yield word + (" " if i < len(words) - 1 else "")
async def _mock_async_llm(prompt: str) -> str:
await asyncio.sleep(0.05)
return _mock_llm(prompt)
async def _mock_async_streaming_llm(prompt: str):
words = _mock_llm(prompt).split(" ")
for i, word in enumerate(words):
await asyncio.sleep(0.01)
yield word + (" " if i < len(words) - 1 else "")
# ββ Examples ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
def example_sync_streaming():
"""1. Iterate over raw stream events."""
print("\nββ 1 Β· Synchronous streaming ββ")
runner = MACPRunner(llm_caller=_mock_llm)
for event in runner.stream(_create_graph()):
text = format_event(event)
if text:
print(f" {text}")
def example_buffer():
"""2. Collect events in a StreamBuffer; print only AGENT_OUTPUT."""
print("\nββ 2 Β· Streaming with buffer ββ")
runner = MACPRunner(llm_caller=_mock_llm)
buf = StreamBuffer()
for event in runner.stream(_create_graph()):
buf.add(event)
if event.event_type == StreamEventType.AGENT_OUTPUT:
name = getattr(event, "agent_name", "?")
content = getattr(event, "content", "")
print(f" [{name}] {content[:80]}β¦")
print(f" Buffered events: {len(buf.events)}")
def example_token_streaming():
"""3. Token-level (word-by-word) streaming."""
print("\nββ 3 Β· Token streaming ββ")
config = RunnerConfig(enable_token_streaming=True)
runner = MACPRunner(streaming_llm_caller=_mock_streaming_llm, config=config)
current_agent = None
tokens = 0
for event in runner.stream(_create_graph()):
if event.event_type == StreamEventType.AGENT_START:
if current_agent:
print(f" ({tokens} tokens)")
current_agent = getattr(event, "agent_name", "?")
tokens = 0
print(f" [{current_agent}] ", end="", flush=True)
elif event.event_type == StreamEventType.TOKEN:
print(getattr(event, "token", ""), end="", flush=True)
tokens += 1
elif event.event_type == StreamEventType.RUN_END:
answer = getattr(event, "final_answer", "")
print(f"\n Done. Answer: {answer[:60]}β¦")
async def example_async_streaming():
"""4. Async streaming."""
print("\nββ 4 Β· Async streaming ββ")
runner = MACPRunner(async_llm_caller=_mock_async_llm)
async for event in runner.astream(_create_graph()):
if event.event_type == StreamEventType.AGENT_START:
print(f" Starting: {getattr(event, 'agent_name', '?')}")
elif event.event_type == StreamEventType.AGENT_OUTPUT:
print(f" Output : {getattr(event, 'content', '')[:80]}β¦")
elif event.event_type == StreamEventType.RUN_END:
print(f" Done. Answer: {getattr(event, 'final_answer', '')[:60]}β¦")
async def example_async_token_streaming():
"""5. Async token-level streaming."""
print("\nββ 5 Β· Async token streaming ββ")
config = RunnerConfig(enable_token_streaming=True)
runner = MACPRunner(async_streaming_llm_caller=_mock_async_streaming_llm, config=config)
current = None
async for event in runner.astream(_create_graph()):
if event.event_type == StreamEventType.AGENT_START:
if current:
print()
current = getattr(event, "agent_name", "?")
print(f" [{current}] ", end="", flush=True)
elif event.event_type == StreamEventType.TOKEN:
print(getattr(event, "token", ""), end="", flush=True)
elif event.event_type == StreamEventType.RUN_END:
print("\n Done.")
def example_print_stream():
"""6. print_stream() helper β handles formatting automatically."""
print("\nββ 6 Β· print_stream() helper ββ")
runner = MACPRunner(llm_caller=_mock_llm)
answer = print_stream(runner.stream(_create_graph()), show_tokens=False, verbose=True)
print(f" Returned answer length: {len(answer or '')}")
def example_adaptive():
"""7. Adaptive execution β topology may change during the run."""
print("\nββ 7 Β· Adaptive streaming ββ")
runner = MACPRunner(llm_caller=_mock_llm, config=RunnerConfig(adaptive=True))
for event in runner.stream(_create_graph()):
text = format_event(event)
if text:
print(f" {text}")
# ββ Entry point βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
# ββ Entry point βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
def main():
example_sync_streaming()
example_buffer()
example_token_streaming()
example_print_stream()
example_adaptive()
asyncio.run(example_async_streaming())
asyncio.run(example_async_token_streaming())
print("\nAll streaming examples completed β
")
if __name__ == "__main__":
main()
|